jlentin opened a new issue, #24801: URL: https://github.com/apache/beam/issues/24801
### What happened? We are trying to upgrade version of Beam from version `2.31.0` to `2.43.0` While upgrading, we noticed that `@Nullable` annotation has been removed from `org.apache.beam.sdk.state.ReadableState.read()`. This is causing our code quality checks to fail when we check null on the return values of the `read` method. I traced it back to this PR: https://github.com/apache/beam/pull/16721 which removed the `@Nullable` annotation Here is an example code: ```java /** * Keeps a state of current total. Keeps accumulating the new value to the total, and outputs the * total value. */ class AccumulateDoFn extends DoFn<KV<String, Integer>, KV<String, Integer>> { private static final Logger LOGGER = LoggerFactory.getLogger(AccumulateDoFn.class); @StateId("PREV_SUM_STATE") private static final StateSpec<ValueState<Integer>> PREV_SUM_STATE = StateSpecs.value(BigEndianIntegerCoder.of()); @ProcessElement public void process( @Element final KV<String, Integer> element, @Timestamp final Instant time, @StateId("PREV_SUM_STATE") final ValueState<Integer> prevSumState, final OutputReceiver<KV<String, Integer>> outputReceiver) { final String key = element.getKey(); final Integer currentValue = element.getValue(); Integer previousSum = prevSumState.read(); if (previousSum == null) { // This line is executed. So `ReadableState.read()` can return null. LOGGER.info("Current state is null. Will initialize with 0"); previousSum = 0; } final Integer newSum = previousSum + currentValue; LOGGER.info("For {}, after {}, new sum is {}", key, currentValue, newSum); prevSumState.write(newSum); outputReceiver.outputWithTimestamp(KV.of(key, newSum), time); } } /** Tests for {@link AccumulateDoFn}. */ class AccumulateDoFnTest { @Test void testSum() { final TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); final List<TimestampedValue<KV<String, Integer>>> values = List.of( TimestampedValue.of(KV.of("key1", 2), Instant.EPOCH), TimestampedValue.of(KV.of("key2", 3), Instant.EPOCH.plus(1L)), TimestampedValue.of(KV.of("key1", 5), Instant.EPOCH.plus(2L))); final PCollection<KV<String, Integer>> result = pipeline .apply(Create.timestamped(values)) .apply(ParDo.of(new AccumulateDoFn())) .apply(Latest.perKey()); PAssert.thatMap(result).isEqualTo(Map.of("key1", 7, "key2", 3)); pipeline.run().waitUntilFinish(); } } ``` ### Issue Priority Priority: 3 (minor) ### Issue Components - [ ] Component: Python SDK - [X] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
