jlentin opened a new issue, #24801:
URL: https://github.com/apache/beam/issues/24801

   ### What happened?
   
   We are trying to upgrade version of Beam from version `2.31.0` to `2.43.0`
   
   While upgrading, we noticed that `@Nullable` annotation has been removed 
from `org.apache.beam.sdk.state.ReadableState.read()`. This is causing our code 
quality checks to fail when we check null on the return values of the `read` 
method.
   
   I traced it back to this PR: https://github.com/apache/beam/pull/16721 which 
removed the `@Nullable` annotation
   
   Here is an example code:
   
   ```java
   /**
    * Keeps a state of current total. Keeps accumulating the new value to the 
total, and outputs the
    * total value.
    */
   class AccumulateDoFn extends DoFn<KV<String, Integer>, KV<String, Integer>> {
   
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AccumulateDoFn.class);
   
     @StateId("PREV_SUM_STATE")
     private static final StateSpec<ValueState<Integer>> PREV_SUM_STATE =
         StateSpecs.value(BigEndianIntegerCoder.of());
   
     @ProcessElement
     public void process(
         @Element final KV<String, Integer> element,
         @Timestamp final Instant time,
         @StateId("PREV_SUM_STATE") final ValueState<Integer> prevSumState,
         final OutputReceiver<KV<String, Integer>> outputReceiver) {
   
       final String key = element.getKey();
       final Integer currentValue = element.getValue();
   
       Integer previousSum = prevSumState.read();
       if (previousSum == null) {
         // This line is executed. So `ReadableState.read()` can return null.
         LOGGER.info("Current state is null. Will initialize with 0");
         previousSum = 0;
       }
   
       final Integer newSum = previousSum + currentValue;
       LOGGER.info("For {}, after {}, new sum is {}", key, currentValue, 
newSum);
       prevSumState.write(newSum);
   
       outputReceiver.outputWithTimestamp(KV.of(key, newSum), time);
     }
   }
   
   
   /** Tests for {@link AccumulateDoFn}. */
   class AccumulateDoFnTest {
   
     @Test
     void testSum() {
   
       final TestPipeline pipeline = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
   
       final List<TimestampedValue<KV<String, Integer>>> values =
           List.of(
               TimestampedValue.of(KV.of("key1", 2), Instant.EPOCH),
               TimestampedValue.of(KV.of("key2", 3), Instant.EPOCH.plus(1L)),
               TimestampedValue.of(KV.of("key1", 5), Instant.EPOCH.plus(2L)));
   
       final PCollection<KV<String, Integer>> result =
           pipeline
               .apply(Create.timestamped(values))
               .apply(ParDo.of(new AccumulateDoFn()))
               .apply(Latest.perKey());
   
       PAssert.thatMap(result).isEqualTo(Map.of("key1", 7, "key2", 3));
       pipeline.run().waitUntilFinish();
     }
   }
   
   ```
   
   ### Issue Priority
   
   Priority: 3 (minor)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to