+1. Thanks.
On Jun 29, 2012 2:48 AM, "Gabriel Reid" <[email protected]> wrote:

> Yep, that's an interesting one as well. I'll fix the value re-use
> issue with the change to PType if that sounds ok to you (or anyone
> else), and I'll take a look at what can be done about this as well if
> possible.
>
> - Gabriel
>
> On Thu, Jun 28, 2012 at 11:41 PM, Josh Wills <[email protected]> wrote:
> > Yeah, that's no good. I had a similar case w/a project my intern was
> > working on, where he created a PType that was:
> >
> > PType<Pair<K, V>> kv = pairs(strings(), strings());
> > ...
> > tableOf(kv, kv);
> >
> > ... which also fails because the output mapfn returned by kv is
> > stateful. Easy to remedy it in the code, but confusing for the user in
> > the same way. It would be nice for the PType to know if their MapFns
> > were stateful so that a new instance of them could be returned each
> > time PType.getInputMapFn or getOutputMapFn was called.
> >
> > On Thu, Jun 28, 2012 at 2:32 PM, Gabriel Reid <[email protected]>
> wrote:
> >> Hmm, strange...according to my mail client the attachment was in
> >> there. Anyhow, I've pasted it inline below:
> >>
> >> package com.cloudera.crunch;
> >>
> >> import static org.junit.Assert.assertEquals;
> >>
> >> import java.io.DataInput;
> >> import java.io.DataOutput;
> >> import java.io.IOException;
> >> import java.io.Serializable;
> >> import java.util.Collection;
> >> import java.util.Collections;
> >> import java.util.List;
> >> import java.util.Map;
> >>
> >> import org.apache.hadoop.io.Writable;
> >> import org.junit.Test;
> >>
> >> import com.cloudera.crunch.impl.mr.MRPipeline;
> >> import com.cloudera.crunch.test.FileHelper;
> >> import com.cloudera.crunch.types.writable.Writables;
> >> import com.google.common.collect.Lists;
> >>
> >> public class SerializationReducerTest implements Serializable {
> >>
> >>  public static class SimpleStringWritable implements Writable {
> >>
> >>    private String value;
> >>
> >>    public void setValue(String value) {
> >>      this.value = value;
> >>    }
> >>
> >>    public String getValue() {
> >>      return value;
> >>    }
> >>
> >>    @Override
> >>    public String toString() {
> >>      return String.format("SimpleStringWritable(%s)", value);
> >>    }
> >>
> >>    @Override
> >>    public void write(DataOutput out) throws IOException {
> >>      out.writeUTF(value);
> >>    }
> >>
> >>    @Override
> >>    public void readFields(DataInput in) throws IOException {
> >>      this.value = in.readUTF();
> >>    }
> >>
> >>  }
> >>
> >>  static SimpleStringWritable asSimple(String value) {
> >>    SimpleStringWritable simpleStringWritable = new
> SimpleStringWritable();
> >>    simpleStringWritable.setValue(value);
> >>    return simpleStringWritable;
> >>  }
> >>
> >>  static List<String> simplesToList(Collection<SimpleStringWritable>
> >> simpleCollection) {
> >>    List<String> stringList = Lists.newArrayList();
> >>    for (SimpleStringWritable writable : simpleCollection) {
> >>      stringList.add(writable.getValue());
> >>    }
> >>    Collections.sort(stringList);
> >>    return stringList;
> >>  }
> >>
> >>  @Test
> >>  public void testWritables() throws IOException {
> >>    Pipeline pipeline = new MRPipeline(SerializationReducerTest.class);
> >>    Map<Integer, Collection<SimpleStringWritable>> collectionMap =
> pipeline
> >>        .readTextFile(FileHelper.createTempCopyOf("set1.txt"))
> >>        .parallelDo(new MapFn<String, Pair<Integer,
> SimpleStringWritable>>() {
> >>
> >>          @Override
> >>          public Pair<Integer, SimpleStringWritable> map(String input) {
> >>            return Pair.of(1, asSimple(input));
> >>          }
> >>
> >>        }, Writables.tableOf(Writables.ints(),
> >> Writables.writables(SimpleStringWritable.class))
> >>
> >>        ).collectValues().materializeToMap();
> >>
> >>    assertEquals(1, collectionMap.size());
> >>
> >>    // The actual content will just be ["e", "e", "e", "e"]
> >>    assertEquals(Lists.newArrayList("a", "b", "c", "e"),
> >> simplesToList(collectionMap.get(1)));
> >>  }
> >>
> >>
> >> }
> >>
> >>
> >> On Thu, Jun 28, 2012 at 11:28 PM, Josh Wills <[email protected]>
> wrote:
> >>> Gabriel,
> >>>
> >>> Generally agree with your line of thought-- where is the attached test
> case?
> >>>
> >>> J
> >>>
> >>> On Thu, Jun 28, 2012 at 2:11 PM, Gabriel Reid <[email protected]>
> wrote:
> >>>> Hi guys,
> >>>>
> >>>> As you may have seen, the topic of the PTable#collectValues method
> came up today in the user mailing list. I hadn't been aware of this method
> before, and when I took a closer look I saw that it just creates a
> Collection of values based on the incoming Iterable, without doing any kind
> of a deep copy of the contents of the Iterable. As far as I can see,
> something similar (i.e. holding on to values from an Iterable from a
> reducer) is also done in the Join methods.
> >>>>
> >>>> As Christian also pointed out (and added to the documentation for
> DoFn), this can be an issue, as values made available as an Iterable in a
> reducer are re-used within Hadoop.
> >>>>
> >>>> This object re-use isn't a problem in Crunch wherever a non-identity
> mapping is used between the serialization type and the PCollection type
> within the PType (for example, with primitives and String). However, using
> Writable types or non-mapped Avro types won't work (as shown in the
> attached test case).
> >>>>
> >>>> I think it's definitely a problem that PTable#collectValues (and
> probably some other methods) doesn't work for Writables, or in a broader
> sense, that the semantics can change for the Iterable that is passed in
> when processing a grouped table.
> >>>>
> >>>> One really easy (but also inefficient) way we could solve this would
> be to not use an IdentityFn as the default mapping function in Writables
> and AvroType, and instead use a MapFn that does a deep copy of the object
> (i.e. by serializing and deserializing itself in memory). This is of course
> a pretty big overhead for a something that isn't necessary in a lot of
> cases.
> >>>>
> >>>> Another option I was considering was to do something like making the
> input and output PTypes of a DoFn available to the DoFn, and adding a
> createDetachedValue method (or something similar) to PType, which would
> then serialize and deserialize objects in order to make a clone if
> necessary. With this approach, the clone method would have to be called
> within the collectValues method (or any other method that is holding on to
> values outside of the iterator).
> >>>>
> >>>> I prefer the second approach, as it avoids the the waste of extra
> cloning/serialization while still making it possible to get detached values
> out of an Iterable.
> >>>>
> >>>> Does anyone else have any thoughts on this?
> >>>>
> >>>> - Gabriel
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> Director of Data Science
> >>> Cloudera
> >>> Twitter: @josh_wills
>

Reply via email to