[
https://issues.apache.org/jira/browse/BEAM-1948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976937#comment-15976937
]
ASF GitHub Bot commented on BEAM-1948:
--------------------------------------
Github user asfgit closed the pull request at:
https://github.com/apache/beam/pull/2573
> Null pointer exception in
> DirectRunner.DirectPipelineResult.getAggregatorValues()
> ---------------------------------------------------------------------------------
>
> Key: BEAM-1948
> URL: https://issues.apache.org/jira/browse/BEAM-1948
> Project: Beam
> Issue Type: Bug
> Components: runner-direct
> Reporter: Etienne Chauchot
> Assignee: Etienne Chauchot
> Priority: Minor
>
> null pointer exception is due to an {{Aggregator}} not being present in
> {{aggregatorSteps}} (maybe because not present in the DAG).
> We can reproduce the null pointer exception with a simple pipeline with an
> {{Aggregator}} and a {{State}} like this one:
> {code}
> IdentityDoFn identityDoFn = new IdentityDoFn();
> p.apply(Create.of(KV.of("key", "element1"), KV.of("key", "element2"),
> KV.of("key", "element3")))
> .apply(ParDo.of(identityDoFn));
> PipelineResult pipelineResult = p.run();
> pipelineResult.getAggregatorValues(identityDoFn.getCounter()).getValues();
> private static class IdentityDoFn extends DoFn<KV<String, String>, String> {
> private final Aggregator<Long, Long> counter =
> createAggregator("counter", Sum.ofLongs());
> private static final String STATE_ID = "state";
> @StateId(STATE_ID)
> private static final StateSpec<Object, ValueState<String>> stateSpec =
> StateSpecs.value(StringUtf8Coder.of());
> @ProcessElement
> public void processElement(ProcessContext context, @StateId(STATE_ID)
> ValueState<String> state){
> state.write("state content");
> counter.addValue(1L);
> context.output(context.element().getValue());
> }
> public Aggregator<Long, Long> getCounter() {
> return counter;
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)