Xiu Shi created BEAM-4301:
-----------------------------

             Summary: NullPointerException with ValueState of custom class
                 Key: BEAM-4301
                 URL: https://issues.apache.org/jira/browse/BEAM-4301
             Project: Beam
          Issue Type: Bug
          Components: sdk-java-core
    Affects Versions: 2.4.0
            Reporter: Xiu Shi
            Assignee: Kenneth Knowles


This is an issue introduced by Beam version 2.4.0, which breaks my current 
implementation of stateful processing during unit test with direct runner.

I'm write a DoFn that has a `ValueState` of a custom class which acts as 
caching. In short, the first item will run through the DoFn just fine, then 
NullPointerException will be thrown before the second item reaches the DoFn. If 
I switch to `ValueState` of a string, then everything works fine.

A simplified version of the DoFn looks like the following:
{code:java}
public class GetPredictionsFn

    extends DoFn<KV<List<String>, String>, KV<String, ArrayList<String>>> {

    @StateId("cache")

    private final StateSpec<ValueState<PredictionResponseDto>> cache =

        StateSpecs.value(AvroCoder.of(PredictionResponseDto.class));

...

    @ProcessElement

    public void processElement(ProcessContext c,

          @StateId("cache") ValueState<PredictionResponseDto> cache

            )

            throws IOException {

        Gson gson = new Gson();



        System.out.println("here");



        List<String> contentIds = c.element().getKey();

        String sessionId = c.element().getValue();

        System.out.println(sessionId);



        ArrayList<String> strItems = null;

        

        try {

            PredictionResponseDto parsedResponse = cache.read(); 

//            strItems = parsedResponse.getStrItems();

            if (parsedResponse == null) {

                throw new RuntimeException();

            }

            System.out.println("read");

            System.out.println(gson.toJson(parsedResponse));

        }

        catch (Exception e) {

            String jsonStr = 
"{\"predictions\":[{\"output\":[19]}],\"strItems\":[\"1.9\"],\"timestamp\":1526485646091}";



            PredictionResponseDto parsedResponse = 
PredictionResponseDto.parseJson(jsonStr);

            strItems = parsedResponse.getStrItems();

            System.out.println("write");

            System.out.println(gson.toJson(parsedResponse));

            System.out.println(gson.toJson(strItems));

            cache.write(parsedResponse);

            PredictionResponseDto parsedResponse1 = cache.read(); 

            System.out.println(gson.toJson(parsedResponse1));

        }

        

        KV<String, ArrayList<String>> predictionKv = KV.of(sessionId, strItems);



        System.out.println(gson.toJson(predictionKv));



        c.output(predictionKv);

    }

}
{code}
And the test pipeline looks like the following:
{code:java}
// Create a test pipeline.

Pipeline p = Pipeline.create();



// Create timestamps

// currentTime need to be set to the past, otherwise the first window will not 
be set to

// currentTime + expiry, but creation time of the pipeline + expiry.

Instant currentTime = new Instant(0L); 

Instant onTime = currentTime

        .plus(Duration.standardSeconds(streamingOptions.getCacheExpiry()))

        .minus(Duration.standardSeconds(1));

Instant lateTime = currentTime

        .plus(Duration.standardSeconds(streamingOptions.getCacheExpiry()))

        .plus(Duration.standardSeconds(1));



// Create stream input.

TestStream<KV<List<String>, String>> events =

        TestStream.create(KvCoder.of(ListCoder.of(

                StringUtf8Coder.of()), StringUtf8Coder.of()))

        // set the next timestamp to be currentTime

        .advanceWatermarkTo(currentTime)

        .addElements(KV.of(Arrays.asList("1"), "1"))

        .advanceWatermarkTo(onTime)

        .addElements(KV.of(Arrays.asList("1"), "2"))

        .addElements(KV.of(Arrays.asList("1"), "3"))

        .advanceWatermarkTo(lateTime)

        .addElements(KV.of(Arrays.asList("1"), "4"))

        .advanceWatermarkToInfinity();

PCollection<KV<String, ArrayList<String>>> output =

        p.apply(events).apply(Window.into(FixedWindows.of(

            Duration.standardSeconds(streamingOptions.getCacheExpiry()))))

        .apply(ParDo.of(doFn));

PAssert.that(output).containsInAnyOrder(expect);
{code}
The Trace looks like:
{code:java}
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.NullPointerException: in ca.cbc.recsysdataflow.PredictionResponseDto 
in array in ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction in array 
null of array in field key of 
ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction of array in field 
predictions of ca.cbc.recsysdataflow.PredictionResponseDto
at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:342)
at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:312)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:206)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at 
ca.cbc.recsysdataflow.test.transforms.streaming.GetPredictionsCacheTest.testCache(GetPredictionsCacheTest.java:156)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206)
Caused by: java.lang.NullPointerException: in 
ca.cbc.recsysdataflow.PredictionResponseDto in array in 
ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction in array null of array 
in field key of ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction of 
array in field predictions of ca.cbc.recsysdataflow.PredictionResponseDto
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
at 
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.uncheckedClone(InMemoryStateInternals.java:648)
at 
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.access$000(InMemoryStateInternals.java:62)
at 
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:221)
at 
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:181)
at 
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$CopyOnBindBinderFactory$1.bindValue(CopyOnAccessInMemoryStateInternals.java:301)
at 
org.apache.beam.runners.direct.repackaged.runners.core.StateTags$2.bindValue(StateTags.java:71)
at org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:278)
at org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:267)
at 
org.apache.beam.runners.direct.repackaged.runners.core.StateTags$SimpleStateTag.bind(StateTags.java:324)
at 
org.apache.beam.runners.direct.repackaged.runners.core.StateTable.get(StateTable.java:60)
at 
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:127)
at 
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:121)
at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.state(SimpleDoFnRunner.java:518)
at 
ca.cbc.recsysdataflow.transforms.streaming.GetPredictionsFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
Caused by: java.lang.NullPointerException
at 
org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:71)
at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at 
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
at 
org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
at 
org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
at 
org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:68)
at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at 
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
at 
org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
at 
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.uncheckedClone(InMemoryStateInternals.java:648)
at 
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.access$000(InMemoryStateInternals.java:62)
at 
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:221)
at 
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:181)
at 
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$CopyOnBindBinderFactory$1.bindValue(CopyOnAccessInMemoryStateInternals.java:301)
at 
org.apache.beam.runners.direct.repackaged.runners.core.StateTags$2.bindValue(StateTags.java:71)
at org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:278)
at org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:267)
at 
org.apache.beam.runners.direct.repackaged.runners.core.StateTags$SimpleStateTag.bind(StateTags.java:324)
at 
org.apache.beam.runners.direct.repackaged.runners.core.StateTable.get(StateTable.java:60)
at 
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:127)
at 
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:121)
at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.state(SimpleDoFnRunner.java:518)
at 
ca.cbc.recsysdataflow.transforms.streaming.GetPredictionsFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
at 
org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:72)
at 
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:179)
at 
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
at 
org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory$StatefulParDoEvaluator.processElement(StatefulParDoEvaluatorFactory.java:245)
at 
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
at 
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to