[
https://issues.apache.org/jira/browse/BEAM-1948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Etienne Chauchot updated BEAM-1948:
-----------------------------------
Description:
null pointer exception is due to an {{Aggregator}} not being present in
{{aggregatorSteps}} (maybe because not present in the DAG).
We can reproduce the null pointer exception with a simple pipeline with an
{{Aggregator}} and a {{State}} like this one:
{code}
IdentityDoFn identityDoFn = new IdentityDoFn();
p.apply(Create.of(KV.of("key", "element1"), KV.of("key", "element2"),
KV.of("key", "element3")))
.apply(ParDo.of(identityDoFn));
PipelineResult pipelineResult = p.run();
pipelineResult.getAggregatorValues(identityDoFn.getCounter()).getValues();
private static class IdentityDoFn extends DoFn<KV<String, String>, String> {
private final Aggregator<Long, Long> counter = createAggregator("counter",
Sum.ofLongs());
private static final String STATE_ID = "state";
@StateId(STATE_ID)
private static final StateSpec<Object, ValueState<String>> stateSpec =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
public void processElement(ProcessContext context, @StateId(STATE_ID)
ValueState<String> state){
state.write("state content");
counter.addValue(1L);
context.output(context.element().getValue());
}
public Aggregator<Long, Long> getCounter() {
return counter;
}
}
{code}
was:
Running query3 of nexmark
(https://github.com/iemejia/beam/blob/BEAM-160-nexmark/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java)
in streaming mode (UnboundedSource) on Direct runner generates a null pointer
exception in {code} DirectRunner.DirectPipelineResult.getAggregatorValues()
{code}
In
{code} if (steps.contains(transform.getTransform())) {code}
{code} steps == null {code}
to reproduce it :
run {code} NexmarkDirectDriver.main() {code}
with options
{code}
--query=3 --streaming=true --numEventGenerators=4 --manageResources=false
--monitorJobs=true --enforceEncodability=false --enforceImmutability=false
{code}
see the repo in link above
> Null pointer exception in
> DirectRunner.DirectPipelineResult.getAggregatorValues()
> ---------------------------------------------------------------------------------
>
> Key: BEAM-1948
> URL: https://issues.apache.org/jira/browse/BEAM-1948
> Project: Beam
> Issue Type: Bug
> Components: runner-direct
> Reporter: Etienne Chauchot
> Assignee: Etienne Chauchot
> Priority: Minor
>
> null pointer exception is due to an {{Aggregator}} not being present in
> {{aggregatorSteps}} (maybe because not present in the DAG).
> We can reproduce the null pointer exception with a simple pipeline with an
> {{Aggregator}} and a {{State}} like this one:
> {code}
> IdentityDoFn identityDoFn = new IdentityDoFn();
> p.apply(Create.of(KV.of("key", "element1"), KV.of("key", "element2"),
> KV.of("key", "element3")))
> .apply(ParDo.of(identityDoFn));
> PipelineResult pipelineResult = p.run();
> pipelineResult.getAggregatorValues(identityDoFn.getCounter()).getValues();
> private static class IdentityDoFn extends DoFn<KV<String, String>, String> {
> private final Aggregator<Long, Long> counter =
> createAggregator("counter", Sum.ofLongs());
> private static final String STATE_ID = "state";
> @StateId(STATE_ID)
> private static final StateSpec<Object, ValueState<String>> stateSpec =
> StateSpecs.value(StringUtf8Coder.of());
> @ProcessElement
> public void processElement(ProcessContext context, @StateId(STATE_ID)
> ValueState<String> state){
> state.write("state content");
> counter.addValue(1L);
> context.output(context.element().getValue());
> }
> public Aggregator<Long, Long> getCounter() {
> return counter;
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)