Updated Branches:
  refs/heads/master 22a33dd56 -> 883c565a3

CRUNCH-71 - Initialize PType MapFns for deep copy

Add a initialization of the input and output MapFns on
PTypes so that they can be reliably used for deep copying of
values within DoFns.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/883c565a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/883c565a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/883c565a

Branch: refs/heads/master
Commit: 883c565a31c914dd004c649fa46df1e47fb85bba
Parents: 22a33dd
Author: Gabriel Reid <[email protected]>
Authored: Thu Sep 20 14:26:40 2012 +0200
Committer: Gabriel Reid <[email protected]>
Committed: Thu Sep 20 20:21:26 2012 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/lib/Aggregate.java |    7 +++++-
 .../apache/crunch/lib/join/FullOuterJoinFn.java    |    1 +
 .../org/apache/crunch/lib/join/InnerJoinFn.java    |    1 +
 .../java/org/apache/crunch/lib/join/JoinFn.java    |    6 +++++
 .../apache/crunch/lib/join/LeftOuterJoinFn.java    |    1 +
 .../apache/crunch/lib/join/RightOuterJoinFn.java   |    1 +
 .../main/java/org/apache/crunch/types/PType.java   |    8 ++++++
 .../crunch/types/avro/AvroGroupedTableType.java    |    5 ++++
 .../org/apache/crunch/types/avro/AvroType.java     |    5 ++++
 .../types/writable/WritableGroupedTableType.java   |    5 ++++
 .../crunch/types/writable/WritableTableType.java   |    6 +++++
 .../apache/crunch/types/writable/WritableType.java |   14 +++++++++++
 .../writable/WritableGroupedTableTypeTest.java     |    1 +
 .../types/writable/WritableTableTypeTest.java      |    2 +-
 .../crunch/types/writable/WritableTypeTest.java    |   18 +++++++++++---
 15 files changed, 75 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java 
b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
index a0588e0..dc3de7c 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -20,7 +20,6 @@ package org.apache.crunch.lib;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
 
@@ -245,6 +244,12 @@ public class Aggregate {
     PTypeFamily tf = collect.getTypeFamily();
     final PType<V> valueType = collect.getValueType();
     return collect.groupByKey().parallelDo("collect", new MapValuesFn<K, 
Iterable<V>, Collection<V>>() {
+
+      @Override
+      public void initialize() {
+        valueType.initialize();
+      }
+
       public Collection<V> map(Iterable<V> values) {
         List<V> collected = Lists.newArrayList();
         for (V value : values) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java 
b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
index 3c63f07..0ceb382 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
@@ -48,6 +48,7 @@ public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> 
{
   /** {@inheritDoc} */
   @Override
   public void initialize() {
+    super.initialize();
     lastId = 1;
     lastKey = null;
     this.leftValues = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java 
b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
index bbcc35f..5275c95 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
@@ -47,6 +47,7 @@ public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
   /** {@inheritDoc} */
   @Override
   public void initialize() {
+    super.initialize();
     lastKey = null;
     this.leftValues = Lists.newArrayList();
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java 
b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
index f45ce9c..dab6c34 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
@@ -51,6 +51,12 @@ public abstract class JoinFn<K, U, V> extends 
DoFn<Pair<Pair<K, Integer>, Iterab
     this.leftValueType = leftValueType;
   }
 
+  @Override
+  public void initialize() {
+    this.keyType.initialize();
+    this.leftValueType.initialize();
+  }
+
   /** @return The name of this join type (e.g. innerJoin, leftOuterJoin). */
   public abstract String getJoinType();
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java 
b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
index 272e081..116353a 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
@@ -48,6 +48,7 @@ public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> 
{
   /** {@inheritDoc} */
   @Override
   public void initialize() {
+    super.initialize();
     lastId = 1;
     lastKey = null;
     this.leftValues = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java 
b/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
index 2dbb2f9..51b74cc 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
@@ -47,6 +47,7 @@ public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, 
V> {
   /** {@inheritDoc} */
   @Override
   public void initialize() {
+    super.initialize();
     lastKey = null;
     this.leftValues = Lists.newArrayList();
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/types/PType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PType.java 
b/crunch/src/main/java/org/apache/crunch/types/PType.java
index a60ce62..bbe8a4b 100644
--- a/crunch/src/main/java/org/apache/crunch/types/PType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/PType.java
@@ -52,6 +52,14 @@ public interface PType<T> extends Serializable {
   Converter getConverter();
 
   /**
+   * Initialize this PType for use within a DoFn. This generally only needs to
+   * be called when using a PType for {@link #getDetachedValue(Object)}.
+   * 
+   * @see PType#getDetachedValue(Object)
+   */
+  void initialize();
+
+  /**
    * Returns a copy of a value (or the value itself) that can safely be
    * retained.
    * <p>

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java 
b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
index ab68e71..aa5b5dc 100644
--- 
a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ 
b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
@@ -72,6 +72,11 @@ public class AvroGroupedTableType<K, V> extends 
PGroupedTableType<K, V> {
   }
 
   @Override
+  public void initialize() {
+    // No initialization needed for Avro PTypes
+  }
+
+  @Override
   public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
     return PTables.getGroupedDetachedValue(this, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java 
b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
index 4997157..a127baa 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -163,6 +163,11 @@ public class AvroType<T> implements PType<T> {
     return new AvroFileSourceTarget<T>(path, this);
   }
 
+  @Override
+  public void initialize() {
+    // No initialization needed for Avro PTypes
+  }
+
   public T getDetachedValue(T value) {
     return deepCopier.deepCopy(value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
 
b/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
index 3c9312a..98afb4d 100644
--- 
a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
+++ 
b/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
@@ -61,6 +61,11 @@ public class WritableGroupedTableType<K, V> extends 
PGroupedTableType<K, V> {
   }
 
   @Override
+  public void initialize() {
+    this.tableType.initialize();
+  }
+
+  @Override
   public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
     return PTables.getGroupedDetachedValue(this, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java 
b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
index fc6dd04..6bb6c5d 100644
--- 
a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
+++ 
b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
@@ -101,6 +101,12 @@ class WritableTableType<K, V> implements PTableType<K, V> {
   }
 
   @Override
+  public void initialize() {
+    keyType.initialize();
+    valueType.initialize();
+  }
+
+  @Override
   public Pair<K, V> getDetachedValue(Pair<K, V> value) {
     return PTables.getDetachedValue(this, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java 
b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
index 23c95ea..71f81f4 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
@@ -41,6 +41,7 @@ public class WritableType<T, W extends Writable> implements 
PType<T> {
   private final MapFn<T, W> outputFn;
   private final DeepCopier<W> deepCopier;
   private final List<PType> subTypes;
+  private boolean initialized = false;
 
   WritableType(Class<T> typeClass, Class<W> writableClass, MapFn<W, T> 
inputDoFn, MapFn<T, W> outputDoFn,
       PType... subTypes) {
@@ -102,7 +103,20 @@ public class WritableType<T, W extends Writable> 
implements PType<T> {
   }
 
   @Override
+  public void initialize() {
+    this.inputFn.initialize();
+    this.outputFn.initialize();
+    for (PType subType : subTypes) {
+      subType.initialize();
+    }
+    this.initialized = true;
+  }
+
+  @Override
   public T getDetachedValue(T value) {
+    if (!initialized) {
+      throw new IllegalStateException("Cannot call getDetachedValue on an 
uninitialized PType");
+    }
     W writableValue = outputFn.map(value);
     W deepCopy = this.deepCopier.deepCopy(writableValue);
     return inputFn.map(deepCopy);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java
 
b/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java
index 3596f13..1699f3c 100644
--- 
a/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java
+++ 
b/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java
@@ -41,6 +41,7 @@ public class WritableGroupedTableTypeTest {
 
     PGroupedTableType<Integer, Text> groupedTableType = 
Writables.tableOf(Writables.ints(),
         Writables.writables(Text.class)).getGroupedTableType();
+    groupedTableType.initialize();
 
     Pair<Integer, Iterable<Text>> detachedPair = 
groupedTableType.getDetachedValue(pair);
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java
 
b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java
index f27e7b7..ae68e7a 100644
--- 
a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java
+++ 
b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java
@@ -34,7 +34,7 @@ public class WritableTableTypeTest {
     Pair<Integer, Text> pair = Pair.of(integerValue, textValue);
 
     WritableTableType<Integer, Text> tableType = 
Writables.tableOf(Writables.ints(), Writables.writables(Text.class));
-
+    tableType.initialize();
     Pair<Integer, Text> detachedPair = tableType.getDetachedValue(pair);
 
     assertSame(integerValue, detachedPair.first());

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/883c565a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java 
b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
index 51a87f5..bea953d 100644
--- 
a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
+++ 
b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java
@@ -19,8 +19,6 @@ package org.apache.crunch.types.writable;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.fail;
 
 import java.util.Collection;
 import java.util.Map;
@@ -35,9 +33,20 @@ import com.google.common.collect.Maps;
 
 public class WritableTypeTest {
 
+  @Test(expected = IllegalStateException.class)
+  public void testGetDetachedValue_NotInitialized() {
+    WritableType<Text, Text> textWritableType = 
Writables.writables(Text.class);
+    Text value = new Text("test");
+
+    // Calling getDetachedValue without first calling initialize should throw 
an
+    // exception
+    textWritableType.getDetachedValue(value);
+  }
+
   @Test
   public void testGetDetachedValue_CustomWritable() {
     WritableType<Text, Text> textWritableType = 
Writables.writables(Text.class);
+    textWritableType.initialize();
     Text value = new Text("test");
 
     Text detachedValue = textWritableType.getDetachedValue(value);
@@ -50,6 +59,7 @@ public class WritableTypeTest {
     Collection<Text> textCollection = Lists.newArrayList(new Text("value"));
     WritableType<Collection<Text>, GenericArrayWritable<Text>> ptype = 
Writables.collections(Writables
         .writables(Text.class));
+    ptype.initialize();
 
     Collection<Text> detachedCollection = 
ptype.getDetachedValue(textCollection);
     assertEquals(textCollection, detachedCollection);
@@ -61,8 +71,7 @@ public class WritableTypeTest {
     Pair<Text, Text> textPair = Pair.of(new Text("one"), new Text("two"));
     WritableType<Pair<Text, Text>, TupleWritable> ptype = 
Writables.pairs(Writables.writables(Text.class),
         Writables.writables(Text.class));
-    ptype.getOutputMapFn().initialize();
-    ptype.getInputMapFn().initialize();
+    ptype.initialize();
 
     Pair<Text, Text> detachedPair = ptype.getDetachedValue(textPair);
     assertEquals(textPair, detachedPair);
@@ -76,6 +85,7 @@ public class WritableTypeTest {
     stringTextMap.put("key", new Text("value"));
 
     WritableType<Map<String, Text>, MapWritable> ptype = 
Writables.maps(Writables.writables(Text.class));
+    ptype.initialize();
     Map<String, Text> detachedMap = ptype.getDetachedValue(stringTextMap);
 
     assertEquals(stringTextMap, detachedMap);

Reply via email to