[ 
https://issues.apache.org/jira/browse/BEAM-4301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188664#comment-17188664
 ] 

Beam JIRA Bot commented on BEAM-4301:
-------------------------------------

This issue was marked "stale-P2" and has not received a public comment in 14 
days. It is now automatically moved to P3. If you are still affected by it, you 
can comment and move it back to P2.

> NullPointerException with ValueState of custom class
> ----------------------------------------------------
>
>                 Key: BEAM-4301
>                 URL: https://issues.apache.org/jira/browse/BEAM-4301
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.4.0
>            Reporter: Xiu Shi
>            Priority: P3
>
> This is an issue introduced by Beam version 2.4.0, which breaks my current 
> implementation of stateful processing during unit test with direct runner.
> I'm write a DoFn that has a `ValueState` of a custom class which acts as 
> caching. In short, the first item will run through the DoFn just fine, then 
> NullPointerException will be thrown before the second item reaches the DoFn. 
> If I switch to `ValueState` of a string, then everything works fine.
> A simplified version of the DoFn looks like the following:
> {code:java}
> public class GetPredictionsFn
>     extends DoFn<KV<List<String>, String>, KV<String, ArrayList<String>>> {
>     @StateId("cache")
>     private final StateSpec<ValueState<PredictionResponseDto>> cache =
>         StateSpecs.value(AvroCoder.of(PredictionResponseDto.class));
> ...
>     @ProcessElement
>     public void processElement(ProcessContext c,
>           @StateId("cache") ValueState<PredictionResponseDto> cache
>             )
>             throws IOException {
>         Gson gson = new Gson();
>         System.out.println("here");
>         List<String> contentIds = c.element().getKey();
>         String sessionId = c.element().getValue();
>         System.out.println(sessionId);
>         ArrayList<String> strItems = null;
>         
>         try {
>             PredictionResponseDto parsedResponse = cache.read(); 
> //            strItems = parsedResponse.getStrItems();
>             if (parsedResponse == null) {
>                 throw new RuntimeException();
>             }
>             System.out.println("read");
>             System.out.println(gson.toJson(parsedResponse));
>         }
>         catch (Exception e) {
>             String jsonStr = 
> "{\"predictions\":[{\"output\":[19]}],\"strItems\":[\"1.9\"],\"timestamp\":1526485646091}";
>             PredictionResponseDto parsedResponse = 
> PredictionResponseDto.parseJson(jsonStr);
>             strItems = parsedResponse.getStrItems();
>             System.out.println("write");
>             System.out.println(gson.toJson(parsedResponse));
>             System.out.println(gson.toJson(strItems));
>             cache.write(parsedResponse);
>             PredictionResponseDto parsedResponse1 = cache.read(); 
>             System.out.println(gson.toJson(parsedResponse1));
>         }
>         
>         KV<String, ArrayList<String>> predictionKv = KV.of(sessionId, 
> strItems);
>         System.out.println(gson.toJson(predictionKv));
>         c.output(predictionKv);
>     }
> }
> {code}
> And the test pipeline looks like the following:
> {code:java}
> // Create a test pipeline.
> Pipeline p = Pipeline.create();
> // Create timestamps
> // currentTime need to be set to the past, otherwise the first window will 
> not be set to
> // currentTime + expiry, but creation time of the pipeline + expiry.
> Instant currentTime = new Instant(0L); 
> Instant onTime = currentTime
>         .plus(Duration.standardSeconds(streamingOptions.getCacheExpiry()))
>         .minus(Duration.standardSeconds(1));
> Instant lateTime = currentTime
>         .plus(Duration.standardSeconds(streamingOptions.getCacheExpiry()))
>         .plus(Duration.standardSeconds(1));
> // Create stream input.
> TestStream<KV<List<String>, String>> events =
>         TestStream.create(KvCoder.of(ListCoder.of(
>                 StringUtf8Coder.of()), StringUtf8Coder.of()))
>         // set the next timestamp to be currentTime
>         .advanceWatermarkTo(currentTime)
>         .addElements(KV.of(Arrays.asList("1"), "1"))
>         .advanceWatermarkTo(onTime)
>         .addElements(KV.of(Arrays.asList("1"), "2"))
>         .addElements(KV.of(Arrays.asList("1"), "3"))
>         .advanceWatermarkTo(lateTime)
>         .addElements(KV.of(Arrays.asList("1"), "4"))
>         .advanceWatermarkToInfinity();
> PCollection<KV<String, ArrayList<String>>> output =
>         p.apply(events).apply(Window.into(FixedWindows.of(
>             Duration.standardSeconds(streamingOptions.getCacheExpiry()))))
>         .apply(ParDo.of(doFn));
> PAssert.that(output).containsInAnyOrder(expect);
> {code}
> The Trace looks like:
> {code:java}
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.NullPointerException: in 
> ca.cbc.recsysdataflow.PredictionResponseDto in array in 
> ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction in array null of 
> array in field key of ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction 
> of array in field predictions of ca.cbc.recsysdataflow.PredictionResponseDto
> at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:342)
> at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:312)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:206)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> at 
> ca.cbc.recsysdataflow.test.transforms.streaming.GetPredictionsCacheTest.testCache(GetPredictionsCacheTest.java:156)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
> at 
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
> at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538)
> at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760)
> at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460)
> at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206)
> Caused by: java.lang.NullPointerException: in 
> ca.cbc.recsysdataflow.PredictionResponseDto in array in 
> ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction in array null of 
> array in field key of ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction 
> of array in field predictions of ca.cbc.recsysdataflow.PredictionResponseDto
> at 
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
> at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
> at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.uncheckedClone(InMemoryStateInternals.java:648)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.access$000(InMemoryStateInternals.java:62)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:221)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:181)
> at 
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$CopyOnBindBinderFactory$1.bindValue(CopyOnAccessInMemoryStateInternals.java:301)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.StateTags$2.bindValue(StateTags.java:71)
> at 
> org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:278)
> at 
> org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:267)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.StateTags$SimpleStateTag.bind(StateTags.java:324)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.StateTable.get(StateTable.java:60)
> at 
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:127)
> at 
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:121)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.state(SimpleDoFnRunner.java:518)
> at 
> ca.cbc.recsysdataflow.transforms.streaming.GetPredictionsFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:71)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
> at 
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
> at 
> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
> at 
> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
> at 
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
> at 
> org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:68)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
> at 
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
> at 
> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
> at 
> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
> at 
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
> at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
> at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.uncheckedClone(InMemoryStateInternals.java:648)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.access$000(InMemoryStateInternals.java:62)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:221)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:181)
> at 
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$CopyOnBindBinderFactory$1.bindValue(CopyOnAccessInMemoryStateInternals.java:301)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.StateTags$2.bindValue(StateTags.java:71)
> at 
> org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:278)
> at 
> org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:267)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.StateTags$SimpleStateTag.bind(StateTags.java:324)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.StateTable.get(StateTable.java:60)
> at 
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:127)
> at 
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:121)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.state(SimpleDoFnRunner.java:518)
> at 
> ca.cbc.recsysdataflow.transforms.streaming.GetPredictionsFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:72)
> at 
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:179)
> at 
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
> at 
> org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory$StatefulParDoEvaluator.processElement(StatefulParDoEvaluatorFactory.java:245)
> at 
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
> at 
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to