[ 
https://issues.apache.org/jira/browse/BEAM-4301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17132222#comment-17132222
 ] 

Beam JIRA Bot commented on BEAM-4301:
-------------------------------------

This issue is assigned but has not received an update in 30 days so it has been 
labeled "stale-assigned". If you are still working on the issue, please give an 
update and remove the label. If you are no longer working on the issue, please 
unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.

> NullPointerException with ValueState of custom class
> ----------------------------------------------------
>
>                 Key: BEAM-4301
>                 URL: https://issues.apache.org/jira/browse/BEAM-4301
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.4.0
>            Reporter: Xiu Shi
>            Assignee: Batkhuyag Batsaikhan
>            Priority: P2
>              Labels: stale-assigned
>
> This is an issue introduced by Beam version 2.4.0, which breaks my current 
> implementation of stateful processing during unit test with direct runner.
> I'm write a DoFn that has a `ValueState` of a custom class which acts as 
> caching. In short, the first item will run through the DoFn just fine, then 
> NullPointerException will be thrown before the second item reaches the DoFn. 
> If I switch to `ValueState` of a string, then everything works fine.
> A simplified version of the DoFn looks like the following:
> {code:java}
> public class GetPredictionsFn
>     extends DoFn<KV<List<String>, String>, KV<String, ArrayList<String>>> {
>     @StateId("cache")
>     private final StateSpec<ValueState<PredictionResponseDto>> cache =
>         StateSpecs.value(AvroCoder.of(PredictionResponseDto.class));
> ...
>     @ProcessElement
>     public void processElement(ProcessContext c,
>           @StateId("cache") ValueState<PredictionResponseDto> cache
>             )
>             throws IOException {
>         Gson gson = new Gson();
>         System.out.println("here");
>         List<String> contentIds = c.element().getKey();
>         String sessionId = c.element().getValue();
>         System.out.println(sessionId);
>         ArrayList<String> strItems = null;
>         
>         try {
>             PredictionResponseDto parsedResponse = cache.read(); 
> //            strItems = parsedResponse.getStrItems();
>             if (parsedResponse == null) {
>                 throw new RuntimeException();
>             }
>             System.out.println("read");
>             System.out.println(gson.toJson(parsedResponse));
>         }
>         catch (Exception e) {
>             String jsonStr = 
> "{\"predictions\":[{\"output\":[19]}],\"strItems\":[\"1.9\"],\"timestamp\":1526485646091}";
>             PredictionResponseDto parsedResponse = 
> PredictionResponseDto.parseJson(jsonStr);
>             strItems = parsedResponse.getStrItems();
>             System.out.println("write");
>             System.out.println(gson.toJson(parsedResponse));
>             System.out.println(gson.toJson(strItems));
>             cache.write(parsedResponse);
>             PredictionResponseDto parsedResponse1 = cache.read(); 
>             System.out.println(gson.toJson(parsedResponse1));
>         }
>         
>         KV<String, ArrayList<String>> predictionKv = KV.of(sessionId, 
> strItems);
>         System.out.println(gson.toJson(predictionKv));
>         c.output(predictionKv);
>     }
> }
> {code}
> And the test pipeline looks like the following:
> {code:java}
> // Create a test pipeline.
> Pipeline p = Pipeline.create();
> // Create timestamps
> // currentTime need to be set to the past, otherwise the first window will 
> not be set to
> // currentTime + expiry, but creation time of the pipeline + expiry.
> Instant currentTime = new Instant(0L); 
> Instant onTime = currentTime
>         .plus(Duration.standardSeconds(streamingOptions.getCacheExpiry()))
>         .minus(Duration.standardSeconds(1));
> Instant lateTime = currentTime
>         .plus(Duration.standardSeconds(streamingOptions.getCacheExpiry()))
>         .plus(Duration.standardSeconds(1));
> // Create stream input.
> TestStream<KV<List<String>, String>> events =
>         TestStream.create(KvCoder.of(ListCoder.of(
>                 StringUtf8Coder.of()), StringUtf8Coder.of()))
>         // set the next timestamp to be currentTime
>         .advanceWatermarkTo(currentTime)
>         .addElements(KV.of(Arrays.asList("1"), "1"))
>         .advanceWatermarkTo(onTime)
>         .addElements(KV.of(Arrays.asList("1"), "2"))
>         .addElements(KV.of(Arrays.asList("1"), "3"))
>         .advanceWatermarkTo(lateTime)
>         .addElements(KV.of(Arrays.asList("1"), "4"))
>         .advanceWatermarkToInfinity();
> PCollection<KV<String, ArrayList<String>>> output =
>         p.apply(events).apply(Window.into(FixedWindows.of(
>             Duration.standardSeconds(streamingOptions.getCacheExpiry()))))
>         .apply(ParDo.of(doFn));
> PAssert.that(output).containsInAnyOrder(expect);
> {code}
> The Trace looks like:
> {code:java}
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.NullPointerException: in 
> ca.cbc.recsysdataflow.PredictionResponseDto in array in 
> ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction in array null of 
> array in field key of ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction 
> of array in field predictions of ca.cbc.recsysdataflow.PredictionResponseDto
> at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:342)
> at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:312)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:206)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> at 
> ca.cbc.recsysdataflow.test.transforms.streaming.GetPredictionsCacheTest.testCache(GetPredictionsCacheTest.java:156)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
> at 
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
> at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538)
> at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760)
> at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460)
> at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206)
> Caused by: java.lang.NullPointerException: in 
> ca.cbc.recsysdataflow.PredictionResponseDto in array in 
> ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction in array null of 
> array in field key of ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction 
> of array in field predictions of ca.cbc.recsysdataflow.PredictionResponseDto
> at 
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
> at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
> at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.uncheckedClone(InMemoryStateInternals.java:648)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.access$000(InMemoryStateInternals.java:62)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:221)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:181)
> at 
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$CopyOnBindBinderFactory$1.bindValue(CopyOnAccessInMemoryStateInternals.java:301)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.StateTags$2.bindValue(StateTags.java:71)
> at 
> org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:278)
> at 
> org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:267)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.StateTags$SimpleStateTag.bind(StateTags.java:324)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.StateTable.get(StateTable.java:60)
> at 
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:127)
> at 
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:121)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.state(SimpleDoFnRunner.java:518)
> at 
> ca.cbc.recsysdataflow.transforms.streaming.GetPredictionsFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:71)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
> at 
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
> at 
> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
> at 
> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
> at 
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
> at 
> org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:68)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
> at 
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
> at 
> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
> at 
> org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
> at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
> at 
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
> at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
> at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
> at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
> at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.uncheckedClone(InMemoryStateInternals.java:648)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.access$000(InMemoryStateInternals.java:62)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:221)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:181)
> at 
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$CopyOnBindBinderFactory$1.bindValue(CopyOnAccessInMemoryStateInternals.java:301)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.StateTags$2.bindValue(StateTags.java:71)
> at 
> org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:278)
> at 
> org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:267)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.StateTags$SimpleStateTag.bind(StateTags.java:324)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.StateTable.get(StateTable.java:60)
> at 
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:127)
> at 
> org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:121)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.state(SimpleDoFnRunner.java:518)
> at 
> ca.cbc.recsysdataflow.transforms.streaming.GetPredictionsFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
> at 
> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:72)
> at 
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:179)
> at 
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
> at 
> org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory$StatefulParDoEvaluator.processElement(StatefulParDoEvaluatorFactory.java:245)
> at 
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
> at 
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to