Updated Branches:
  refs/heads/master 78a3eb7d9 -> 0ba630bb1

CRUNCH-117: Allow custom subclasses of Tuple to work correctly with the 
DeepCopier logic

Signed-off-by: Gabriel Reid <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/0ba630bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/0ba630bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/0ba630bb

Branch: refs/heads/master
Commit: 0ba630bb10765c2e74283b6c23560f55f9ca4e74
Parents: 78a3eb7
Author: Josh Wills <[email protected]>
Authored: Mon Nov 19 16:05:59 2012 -0800
Committer: Gabriel Reid <[email protected]>
Committed: Wed Nov 21 22:15:01 2012 +0100

----------------------------------------------------------------------
 .../org/apache/crunch/CollectionPObjectIT.java     |   10 --
 .../org/apache/crunch/CollectionsLengthIT.java     |    6 -
 .../org/apache/crunch/DeepCopyCustomTuplesIT.java  |   79 +++++++++++++++
 .../org/apache/crunch/FirstElementPObjectIT.java   |   11 --
 .../src/it/java/org/apache/crunch/PObjectsIT.java  |    9 --
 .../src/it/java/org/apache/crunch/PageRankIT.java  |    1 -
 .../it/java/org/apache/crunch/lib/AggregateIT.java |    2 -
 .../java/org/apache/crunch/types/TupleFactory.java |   18 +++-
 .../org/apache/crunch/types/TupleFactoryTest.java  |    4 +-
 9 files changed, 96 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java 
b/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java
index 2157d24..7e0c75c 100644
--- a/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java
+++ b/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java
@@ -18,11 +18,8 @@
 package org.apache.crunch;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.lang.Integer;
-import java.lang.Iterable;
 import java.lang.String;
 import java.util.Collection;
 
@@ -32,18 +29,11 @@ import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.materialize.pobject.CollectionPObject;
-import org.apache.crunch.materialize.pobject.PObjectImpl;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.junit.Rule;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
 @SuppressWarnings("serial")
 public class CollectionPObjectIT {
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java 
b/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
index 8fb03ee..3a38b92 100644
--- a/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
+++ b/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java
@@ -18,13 +18,10 @@
 package org.apache.crunch;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.lang.Long;
-import java.util.Collection;
 
-import org.apache.crunch.PObject;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.TemporaryPath;
@@ -35,9 +32,6 @@ import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.junit.Rule;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
 @SuppressWarnings("serial")
 public class CollectionsLengthIT {
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java 
b/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java
new file mode 100644
index 0000000..f1323ca
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PType;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+/**
+ *
+ */
+public class DeepCopyCustomTuplesIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  public static class PID extends Pair<Integer, String> {
+    public PID(Integer first, String second) {
+      super(first, second);
+    }
+  }
+  
+  private static PType<PID> pids = tuples(PID.class, ints(), strings());
+  
+  @Test
+  public void testDeepCopyCustomTuple() throws Exception {
+    Pipeline p = new MRPipeline(DeepCopyCustomTuplesIT.class, 
tmpDir.getDefaultConfiguration());
+    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
+    PCollection<String> shakes = p.readTextFile(shakesInputPath);
+    Iterable<String> out = shakes
+        .parallelDo(new PreProcFn(), tableOf(ints(), pairs(ints(), pids)))
+        .groupByKey()
+        .parallelDo(new PostProcFn(), strings())
+        .materialize();
+    assertEquals(65, Iterables.size(out));
+    p.done();
+  }
+  
+  private static class PreProcFn extends MapFn<String, Pair<Integer, 
Pair<Integer, PID>>> {
+    private int counter = 0;
+    @Override
+    public Pair<Integer, Pair<Integer, PID>> map(String input) {
+      return Pair.of(counter++, Pair.of(counter++, new PID(input.length(), 
input)));
+    }
+  };
+  
+  private static class PostProcFn extends DoFn<Pair<Integer, 
Iterable<Pair<Integer, PID>>>, String> {
+    @Override
+    public void process(Pair<Integer, Iterable<Pair<Integer, PID>>> input, 
Emitter<String> emitter) {
+      for (Pair<Integer, PID> p : input.second()) {
+        if (p.second().first() > 0 && p.second().first() < 10) {
+          emitter.emit(p.second().second());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java 
b/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java
index 3efcf30..d985e10 100644
--- a/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java
+++ b/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java
@@ -18,31 +18,20 @@
 package org.apache.crunch;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.lang.Integer;
-import java.lang.Iterable;
 import java.lang.String;
-import java.util.Collection;
 
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.materialize.pobject.FirstElementPObject;
-import org.apache.crunch.materialize.pobject.PObjectImpl;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.junit.Rule;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
 @SuppressWarnings("serial")
 public class FirstElementPObjectIT {
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/it/java/org/apache/crunch/PObjectsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PObjectsIT.java 
b/crunch/src/it/java/org/apache/crunch/PObjectsIT.java
index a3810dc..6ee849f 100644
--- a/crunch/src/it/java/org/apache/crunch/PObjectsIT.java
+++ b/crunch/src/it/java/org/apache/crunch/PObjectsIT.java
@@ -18,31 +18,22 @@
 package org.apache.crunch;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.lang.Integer;
 import java.lang.Iterable;
 import java.lang.String;
-import java.util.Collection;
 import java.util.Iterator;
 
 import org.apache.crunch.PCollection;
-import org.apache.crunch.PObject;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.materialize.pobject.PObjectImpl;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.junit.Rule;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
 @SuppressWarnings("serial")
 public class PObjectsIT {
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/it/java/org/apache/crunch/PageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PageRankIT.java 
b/crunch/src/it/java/org/apache/crunch/PageRankIT.java
index 5a555b3..6291ef8 100644
--- a/crunch/src/it/java/org/apache/crunch/PageRankIT.java
+++ b/crunch/src/it/java/org/apache/crunch/PageRankIT.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.crunch.PObject;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.lib.Aggregate;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java 
b/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
index 88596aa..56ee3ac 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
@@ -28,7 +28,6 @@ import java.util.Map;
 
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
-import org.apache.crunch.PObject;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
@@ -48,7 +47,6 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 public class AggregateIT {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java 
b/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
index c547cd6..69fbc92 100644
--- a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
@@ -19,6 +19,7 @@ package org.apache.crunch.types;
 
 import java.io.Serializable;
 import java.lang.reflect.Constructor;
+import java.util.Map;
 
 import org.apache.crunch.Pair;
 import org.apache.crunch.Tuple;
@@ -27,6 +28,8 @@ import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
 import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 
+import com.google.common.collect.Maps;
+
 public abstract class TupleFactory<T extends Tuple> implements Serializable {
 
   public void initialize() {
@@ -34,9 +37,11 @@ public abstract class TupleFactory<T extends Tuple> 
implements Serializable {
 
   public abstract T makeTuple(Object... values);
 
+  
+  private static final Map<Class, TupleFactory> customTupleFactories = 
Maps.newHashMap();
+  
   /**
-   * Get the {@link TupleFactory} for a given Tuple implementation. Only
-   * standard Tuple implementations are supported.
+   * Get the {@link TupleFactory} for a given Tuple implementation.
    * 
    * @param tupleClass
    *          The class for which the factory is to be retrieved
@@ -51,6 +56,8 @@ public abstract class TupleFactory<T extends Tuple> 
implements Serializable {
       return (TupleFactory<T>) TUPLE4;
     } else if (tupleClass == TupleN.class) {
       return (TupleFactory<T>) TUPLEN;
+    } else if (customTupleFactories.containsKey(tupleClass)) {
+      return (TupleFactory<T>) customTupleFactories.get(tupleClass);
     } else {
       throw new IllegalArgumentException("Can't create TupleFactory for " + 
tupleClass);
     }
@@ -85,7 +92,12 @@ public abstract class TupleFactory<T extends Tuple> 
implements Serializable {
   };
 
   public static <T extends Tuple> TupleFactory<T> create(Class<T> clazz, 
Class... typeArgs) {
-    return new CustomTupleFactory<T>(clazz, typeArgs);
+    if (customTupleFactories.containsKey(clazz)) {
+      return (TupleFactory<T>) customTupleFactories.get(clazz);
+    }
+    TupleFactory<T> custom = new CustomTupleFactory<T>(clazz, typeArgs);
+    customTupleFactories.put(clazz, custom);
+    return custom;
   }
 
   private static class CustomTupleFactory<T extends Tuple> extends 
TupleFactory<T> {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java 
b/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java
index 25b0371..0726be2 100644
--- a/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java
@@ -48,9 +48,9 @@ public class TupleFactoryTest {
     assertEquals(TupleFactory.TUPLEN, 
TupleFactory.getTupleFactory(TupleN.class));
   }
 
-  @Test(expected = IllegalArgumentException.class)
   public void testGetTupleFactory_CustomTupleClass() {
-    TupleFactory.getTupleFactory(CustomTupleImplementation.class);
+       TupleFactory<CustomTupleImplementation> customTupleFactory = 
TupleFactory.create(CustomTupleImplementation.class);
+    assertEquals(customTupleFactory, 
TupleFactory.getTupleFactory(CustomTupleImplementation.class));
   }
 
   private static class CustomTupleImplementation implements Tuple {

Reply via email to