[ 
https://issues.apache.org/jira/browse/CRUNCH-601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425337#comment-15425337
 ] 

Mikael Goldmann commented on CRUNCH-601:
----------------------------------------

This (on top of the patch) passes the test even for length zero (but there may 
of course be other corner cases as long as one trusts getSize() == 0).
{code}
  // In crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
  public static <S> PObject<Long> length(PCollection<S> collect) {
    PTypeFamily tf = collect.getTypeFamily();
    PTable<Integer, Long> countTable = collect
        .parallelDo("Aggregate.count", new MapFn<S, Pair<Integer, Long>>() {
          public Pair<Integer, Long> map(S input) {
            return Pair.of(1, 1L);
          }
          public void cleanup(Emitter<Pair<Integer, Long>> e) {
            e.emit(Pair.of(1, 0L));
          }
        }, tf.tableOf(tf.ints(), tf.longs()))
        .groupByKey(GroupingOptions.builder().numReducers(1).build())
        .combineValues(Aggregators.SUM_LONGS());
    PCollection<Long> count = countTable.values();
    final FirstElementPObject<Long> first = new FirstElementPObject<>(count);
    return new PObject<Long>() {
      @Override
      public Long getValue() {
        final Long value = first.getValue();
        return value == null ? 0 : value;
      }
    };
  }
{code}

> Short PCollections in SparkPipeline get length null.
> ----------------------------------------------------
>
>                 Key: CRUNCH-601
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-601
>             Project: Crunch
>          Issue Type: Bug
>          Components: Spark
>    Affects Versions: 0.13.0
>         Environment: Running in local mode on Mac as well as in a ubuntu 
> 14.04 docker container
>            Reporter: Mikael Goldmann
>            Priority: Minor
>         Attachments: CRUNCH-601.patch, SmallCollectionLengthTest.java
>
>
> I'll attach a file with a test that I would expect to pass but which fails.
> It creates five PCollection<String> of lengths 0, 1, 2, 3, 4 gets the 
> lengths, runs the pipeline and prints the lengths. Finally it asserts that 
> all lengths are non-null.
> I would expect it to print lengths 0, 1, 2, 3, 4 and pass.
> What it does is print lengths null, null, null, 3, 4 and fail.
> I think the underlying reason is the use of getSize() on an unmaterialized 
> object and assuming that when the estimate that getSize() returns is 0, then 
> the PCollection is guaranteed to be empty, which is false in some cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to