[ 
https://issues.apache.org/jira/browse/BEAM-1948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976937#comment-15976937
 ] 

ASF GitHub Bot commented on BEAM-1948:
--------------------------------------

Github user asfgit closed the pull request at:

    https://github.com/apache/beam/pull/2573


> Null pointer exception in 
> DirectRunner.DirectPipelineResult.getAggregatorValues()
> ---------------------------------------------------------------------------------
>
>                 Key: BEAM-1948
>                 URL: https://issues.apache.org/jira/browse/BEAM-1948
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Etienne Chauchot
>            Assignee: Etienne Chauchot
>            Priority: Minor
>
> null pointer exception is due to an {{Aggregator}} not being present in 
> {{aggregatorSteps}} (maybe because not present in the DAG).
> We can reproduce the null pointer exception with a simple pipeline with an 
> {{Aggregator}} and a {{State}} like this one:
> {code}
>     IdentityDoFn identityDoFn = new IdentityDoFn();
>     p.apply(Create.of(KV.of("key", "element1"), KV.of("key", "element2"), 
> KV.of("key", "element3")))
>         .apply(ParDo.of(identityDoFn));
>     PipelineResult pipelineResult = p.run();
>     pipelineResult.getAggregatorValues(identityDoFn.getCounter()).getValues();
>   private static class IdentityDoFn extends DoFn<KV<String, String>, String> {
>     private final Aggregator<Long, Long> counter = 
> createAggregator("counter", Sum.ofLongs());
>     private static final String STATE_ID = "state";
>     @StateId(STATE_ID)
>     private static final StateSpec<Object, ValueState<String>> stateSpec =
>         StateSpecs.value(StringUtf8Coder.of());
>     @ProcessElement
>     public void processElement(ProcessContext context, @StateId(STATE_ID) 
> ValueState<String> state){
>       state.write("state content");
>       counter.addValue(1L);
>       context.output(context.element().getValue());
>     }
>     public Aggregator<Long, Long> getCounter() {
>       return counter;
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to