[
https://issues.apache.org/jira/browse/BEAM-4301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188664#comment-17188664
]
Beam JIRA Bot commented on BEAM-4301:
-------------------------------------
This issue was marked "stale-P2" and has not received a public comment in 14
days. It is now automatically moved to P3. If you are still affected by it, you
can comment and move it back to P2.
> NullPointerException with ValueState of custom class
> ----------------------------------------------------
>
> Key: BEAM-4301
> URL: https://issues.apache.org/jira/browse/BEAM-4301
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Affects Versions: 2.4.0
> Reporter: Xiu Shi
> Priority: P3
>
> This is an issue introduced by Beam version 2.4.0, which breaks my current
> implementation of stateful processing during unit test with direct runner.
> I'm write a DoFn that has a `ValueState` of a custom class which acts as
> caching. In short, the first item will run through the DoFn just fine, then
> NullPointerException will be thrown before the second item reaches the DoFn.
> If I switch to `ValueState` of a string, then everything works fine.
> A simplified version of the DoFn looks like the following:
> {code:java}
> public class GetPredictionsFn
> extends DoFn<KV<List<String>, String>, KV<String, ArrayList<String>>> {
> @StateId("cache")
> private final StateSpec<ValueState<PredictionResponseDto>> cache =
> StateSpecs.value(AvroCoder.of(PredictionResponseDto.class));
> ...
> @ProcessElement
> public void processElement(ProcessContext c,
> @StateId("cache") ValueState<PredictionResponseDto> cache
> )
> throws IOException {
> Gson gson = new Gson();
> System.out.println("here");
> List<String> contentIds = c.element().getKey();
> String sessionId = c.element().getValue();
> System.out.println(sessionId);
> ArrayList<String> strItems = null;
>
> try {
> PredictionResponseDto parsedResponse = cache.read();
> // strItems = parsedResponse.getStrItems();
> if (parsedResponse == null) {
> throw new RuntimeException();
> }
> System.out.println("read");
> System.out.println(gson.toJson(parsedResponse));
> }
> catch (Exception e) {
> String jsonStr =
> "{\"predictions\":[{\"output\":[19]}],\"strItems\":[\"1.9\"],\"timestamp\":1526485646091}";
> PredictionResponseDto parsedResponse =
> PredictionResponseDto.parseJson(jsonStr);
> strItems = parsedResponse.getStrItems();
> System.out.println("write");
> System.out.println(gson.toJson(parsedResponse));
> System.out.println(gson.toJson(strItems));
> cache.write(parsedResponse);
> PredictionResponseDto parsedResponse1 = cache.read();
> System.out.println(gson.toJson(parsedResponse1));
> }
>
> KV<String, ArrayList<String>> predictionKv = KV.of(sessionId,
> strItems);
> System.out.println(gson.toJson(predictionKv));
> c.output(predictionKv);
> }
> }
> {code}
> And the test pipeline looks like the following:
> {code:java}
> // Create a test pipeline.
> Pipeline p = Pipeline.create();
> // Create timestamps
> // currentTime need to be set to the past, otherwise the first window will
> not be set to
> // currentTime + expiry, but creation time of the pipeline + expiry.
> Instant currentTime = new Instant(0L);
> Instant onTime = currentTime
> .plus(Duration.standardSeconds(streamingOptions.getCacheExpiry()))
> .minus(Duration.standardSeconds(1));
> Instant lateTime = currentTime
> .plus(Duration.standardSeconds(streamingOptions.getCacheExpiry()))
> .plus(Duration.standardSeconds(1));
> // Create stream input.
> TestStream<KV<List<String>, String>> events =
> TestStream.create(KvCoder.of(ListCoder.of(
> StringUtf8Coder.of()), StringUtf8Coder.of()))
> // set the next timestamp to be currentTime
> .advanceWatermarkTo(currentTime)
> .addElements(KV.of(Arrays.asList("1"), "1"))
> .advanceWatermarkTo(onTime)
> .addElements(KV.of(Arrays.asList("1"), "2"))
> .addElements(KV.of(Arrays.asList("1"), "3"))
> .advanceWatermarkTo(lateTime)
> .addElements(KV.of(Arrays.asList("1"), "4"))
> .advanceWatermarkToInfinity();
> PCollection<KV<String, ArrayList<String>>> output =
> p.apply(events).apply(Window.into(FixedWindows.of(
> Duration.standardSeconds(streamingOptions.getCacheExpiry()))))
> .apply(ParDo.of(doFn));
> PAssert.that(output).containsInAnyOrder(expect);
> {code}
> The Trace looks like:
> {code:java}
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.NullPointerException: in
> ca.cbc.recsysdataflow.PredictionResponseDto in array in
> ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction in array null of
> array in field key of ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction
> of array in field predictions of ca.cbc.recsysdataflow.PredictionResponseDto
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:342)
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:312)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:206)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> at
> ca.cbc.recsysdataflow.test.transforms.streaming.GetPredictionsCacheTest.testCache(GetPredictionsCacheTest.java:156)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
> at
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206)
> Caused by: java.lang.NullPointerException: in
> ca.cbc.recsysdataflow.PredictionResponseDto in array in
> ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction in array null of
> array in field key of ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction
> of array in field predictions of ca.cbc.recsysdataflow.PredictionResponseDto
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
> at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
> at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.uncheckedClone(InMemoryStateInternals.java:648)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.access$000(InMemoryStateInternals.java:62)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:221)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:181)
> at
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$CopyOnBindBinderFactory$1.bindValue(CopyOnAccessInMemoryStateInternals.java:301)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.StateTags$2.bindValue(StateTags.java:71)
> at
> org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:278)
> at
> org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:267)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.StateTags$SimpleStateTag.bind(StateTags.java:324)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.StateTable.get(StateTable.java:60)
> at
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:127)
> at
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:121)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.state(SimpleDoFnRunner.java:518)
> at
> ca.cbc.recsysdataflow.transforms.streaming.GetPredictionsFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> Caused by: java.lang.NullPointerException
> at
> org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:71)
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
> at
> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
> at
> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
> at
> org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
> at
> org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:68)
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
> at
> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
> at
> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
> at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
> at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
> at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.uncheckedClone(InMemoryStateInternals.java:648)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.access$000(InMemoryStateInternals.java:62)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:221)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:181)
> at
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$CopyOnBindBinderFactory$1.bindValue(CopyOnAccessInMemoryStateInternals.java:301)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.StateTags$2.bindValue(StateTags.java:71)
> at
> org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:278)
> at
> org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:267)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.StateTags$SimpleStateTag.bind(StateTags.java:324)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.StateTable.get(StateTable.java:60)
> at
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:127)
> at
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:121)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.state(SimpleDoFnRunner.java:518)
> at
> ca.cbc.recsysdataflow.transforms.streaming.GetPredictionsFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:72)
> at
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:179)
> at
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
> at
> org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory$StatefulParDoEvaluator.processElement(StatefulParDoEvaluatorFactory.java:245)
> at
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
> at
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)