kennknowles opened a new issue, #18863:
URL: https://github.com/apache/beam/issues/18863

   This is an issue introduced by Beam version 2.4.0, which breaks my current 
implementation of stateful processing during unit test with direct runner.
   
   I'm write a DoFn that has a `ValueState` of a custom class which acts as 
caching. In short, the first item will run through the DoFn just fine, then 
NullPointerException will be thrown before the second item reaches the DoFn. If 
I switch to `ValueState` of a string, then everything works fine.
   
   A simplified version of the DoFn looks like the following:
   ```
   
   public class GetPredictionsFn
   
       extends DoFn<KV<List<String>, String>, KV<String, ArrayList<String>>>
   {
   
       @StateId("cache")
   
       private final StateSpec<ValueState<PredictionResponseDto>> cache =
   
    
         StateSpecs.value(AvroCoder.of(PredictionResponseDto.class));
   
   ...
   
       @ProcessElement
   
    
     public void processElement(ProcessContext c,
   
             @StateId("cache") ValueState<PredictionResponseDto>
   cache
   
               )
   
               throws IOException {
   
           Gson gson = new Gson();
   
   
   
    
         System.out.println("here");
   
   
   
           List<String> contentIds = c.element().getKey();
   
    
         String sessionId = c.element().getValue();
   
           System.out.println(sessionId);
   
   
   
    
         ArrayList<String> strItems = null;
   
           
   
           try {
   
               PredictionResponseDto
   parsedResponse = cache.read(); 
   
   //            strItems = parsedResponse.getStrItems();
   
          
       if (parsedResponse == null) {
   
                   throw new RuntimeException();
   
               }
   
    
             System.out.println("read");
   
               System.out.println(gson.toJson(parsedResponse));
   
    
         }
   
           catch (Exception e) {
   
               String jsonStr = 
"{\"predictions\":[{\"output\":[19]}],\"strItems\":[\"1.9\"],\"timestamp\":1526485646091}";
   
   
   
    
             PredictionResponseDto parsedResponse = 
PredictionResponseDto.parseJson(jsonStr);
   
        
         strItems = parsedResponse.getStrItems();
   
               System.out.println("write");
   
        
         System.out.println(gson.toJson(parsedResponse));
   
               System.out.println(gson.toJson(strItems));
   
    
             cache.write(parsedResponse);
   
               PredictionResponseDto parsedResponse1 = cache.read(); 
   
    
             System.out.println(gson.toJson(parsedResponse1));
   
           }
   
           
   
           KV<String,
   ArrayList<String>> predictionKv = KV.of(sessionId, strItems);
   
   
   
           System.out.println(gson.toJson(predictionKv));
   
   
   
    
         c.output(predictionKv);
   
       }
   
   }
   
   ```
   
   And the test pipeline looks like the following:
   ```
   
   // Create a test pipeline.
   
   Pipeline p = Pipeline.create();
   
   
   
   // Create timestamps
   
   // currentTime
   need to be set to the past, otherwise the first window will not be set to
   
   // currentTime + expiry,
   but creation time of the pipeline + expiry.
   
   Instant currentTime = new Instant(0L); 
   
   Instant onTime
   = currentTime
   
           .plus(Duration.standardSeconds(streamingOptions.getCacheExpiry()))
   
        
     .minus(Duration.standardSeconds(1));
   
   Instant lateTime = currentTime
   
           .plus(Duration.standardSeconds(streamingOptions.getCacheExpiry()))
   
    
         .plus(Duration.standardSeconds(1));
   
   
   
   // Create stream input.
   
   TestStream<KV<List<String>,
   String>> events =
   
           TestStream.create(KvCoder.of(ListCoder.of(
   
                   StringUtf8Coder.of()),
   StringUtf8Coder.of()))
   
           // set the next timestamp to be currentTime
   
           .advanceWatermarkTo(currentTime)
   
    
         .addElements(KV.of(Arrays.asList("1"), "1"))
   
           .advanceWatermarkTo(onTime)
   
          
   .addElements(KV.of(Arrays.asList("1"), "2"))
   
           .addElements(KV.of(Arrays.asList("1"), "3"))
   
    
         .advanceWatermarkTo(lateTime)
   
           .addElements(KV.of(Arrays.asList("1"), "4"))
   
        
     .advanceWatermarkToInfinity();
   
   PCollection<KV<String, ArrayList<String>>> output =
   
           p.apply(events).apply(Window.into(FixedWindows.of(
   
    
             Duration.standardSeconds(streamingOptions.getCacheExpiry()))))
   
           .apply(ParDo.of(doFn));
   
   PAssert.that(output).containsInAnyOrder(expect);
   
   ```
   
   The Trace looks like:
   ```
   
   org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.NullPointerException: in ca.cbc.recsysdataflow.PredictionResponseDto
   in array in ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction in array 
null of array in field
   key of ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction of array in 
field predictions of ca.cbc.recsysdataflow.PredictionResponseDto
   at
   
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:342)
   at
   
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:312)
   at
   org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:206)
   at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
   at
   org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
   at
   
ca.cbc.recsysdataflow.test.transforms.streaming.GetPredictionsCacheTest.testCache(GetPredictionsCacheTest.java:156)
   at
   sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at
   
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)
   at
   
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
   at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
   at
   
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
   at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
   at
   org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
   at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
   at
   
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
   at
   org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
   at
   org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
   at
   org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
   at
   
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
   at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
   at
   
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538)
   at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760)
   at
   
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460)
   at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206)
   Caused
   by: java.lang.NullPointerException: in 
ca.cbc.recsysdataflow.PredictionResponseDto in array in 
ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction
   in array null of array in field key of 
ca.cbc.recsysdataflow.PredictionResponseDto$.Prediction of array
   in field predictions of ca.cbc.recsysdataflow.PredictionResponseDto
   at 
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
   at
   org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
   at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
   at
   org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
   at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
   at
   org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
   at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
   at
   org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
   at 
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.uncheckedClone(InMemoryStateInternals.java:648)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.access$000(InMemoryStateInternals.java:62)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:221)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:181)
   at
   
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$CopyOnBindBinderFactory$1.bindValue(CopyOnAccessInMemoryStateInternals.java:301)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.StateTags$2.bindValue(StateTags.java:71)
   at
   org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:278)
   at 
org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:267)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.StateTags$SimpleStateTag.bind(StateTags.java:324)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.StateTable.get(StateTable.java:60)
   at 
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:127)
   at
   
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:121)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.state(SimpleDoFnRunner.java:518)
   at
   
ca.cbc.recsysdataflow.transforms.streaming.GetPredictionsFn$DoFnInvoker.invokeProcessElement(Unknown
   Source)
   Caused by: java.lang.NullPointerException
   at 
org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:71)
   at
   
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
   at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
   at
   org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
   at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
   at
   
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
   at 
org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
   at
   
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
   at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
   at
   org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
   at 
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
   at
   
org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
   at 
org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:68)
   at
   
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
   at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
   at
   org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
   at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
   at
   
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
   at 
org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:191)
   at
   
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
   at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
   at
   org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
   at 
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:159)
   at
   org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
   at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:308)
   at
   org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
   at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
   at
   org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
   at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
   at
   org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
   at 
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.uncheckedClone(InMemoryStateInternals.java:648)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals.access$000(InMemoryStateInternals.java:62)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:221)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.InMemoryStateInternals$InMemoryValue.copy(InMemoryStateInternals.java:181)
   at
   
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals$CopyOnAccessInMemoryStateTable$CopyOnBindBinderFactory$1.bindValue(CopyOnAccessInMemoryStateInternals.java:301)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.StateTags$2.bindValue(StateTags.java:71)
   at
   org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:278)
   at 
org.apache.beam.sdk.state.StateSpecs$ValueStateSpec.bind(StateSpecs.java:267)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.StateTags$SimpleStateTag.bind(StateTags.java:324)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.StateTable.get(StateTable.java:60)
   at 
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:127)
   at
   
org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals.state(CopyOnAccessInMemoryStateInternals.java:121)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.state(SimpleDoFnRunner.java:518)
   at
   
ca.cbc.recsysdataflow.transforms.streaming.GetPredictionsFn$DoFnInvoker.invokeProcessElement(Unknown
   Source)
   at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
   at
   
org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:72)
   at
   
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:179)
   at 
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
   at
   
org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory$StatefulParDoEvaluator.processElement(StatefulParDoEvaluatorFactory.java:245)
   at
   
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
   at
   
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at
   java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)
   
   ```
   
   
   Imported from Jira 
[BEAM-4301](https://issues.apache.org/jira/browse/BEAM-4301). Original Jira may 
contain additional context.
   Reported by: sx5640.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to