NonNull by default in sdk/state
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/524d8249 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/524d8249 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/524d8249 Branch: refs/heads/master Commit: 524d824915203da4949d08bdc6bebb6abcb90f55 Parents: 51118fb Author: Kenneth Knowles <[email protected]> Authored: Thu Oct 19 19:54:46 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Fri Oct 20 14:51:42 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/runners/core/SideInputHandler.java | 2 +- .../main/java/org/apache/beam/runners/core/WatermarkHold.java | 6 +++--- .../beam/runners/core/triggers/TriggerStateMachineRunner.java | 3 ++- .../core/src/main/java/org/apache/beam/sdk/state/BagState.java | 6 ++++++ .../main/java/org/apache/beam/sdk/state/CombiningState.java | 5 +++++ .../src/main/java/org/apache/beam/sdk/state/ReadableState.java | 4 +++- .../src/main/java/org/apache/beam/sdk/state/StateSpecs.java | 4 ++-- .../src/main/java/org/apache/beam/sdk/state/package-info.java | 4 ++++ 8 files changed, 26 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/524d8249/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java index 539b9f0..3b37702 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java @@ -174,7 +174,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { ValueState<Iterable<WindowedValue<?>>> state = stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag); - Iterable<WindowedValue<?>> elements = state.read(); + @Nullable Iterable<WindowedValue<?>> elements = state.read(); if (elements == null) { elements = Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/beam/blob/524d8249/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 13e4c43..8859bbb 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -483,9 +483,9 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { @Override public OldAndNewHolds read() { // Read both the element and extra holds. - Instant elementHold = elementHoldState.read(); - Instant extraHold = extraHoldState.read(); - Instant oldHold; + @Nullable Instant elementHold = elementHoldState.read(); + @Nullable Instant extraHold = extraHoldState.read(); + @Nullable Instant oldHold; // Find the minimum, accounting for null. if (elementHold == null) { oldHold = extraHold; http://git-wip-us.apache.org/repos/asf/beam/blob/524d8249/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java index 88ea6ef..b643a7b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import java.util.BitSet; import java.util.Collection; import java.util.Map; +import javax.annotation.Nullable; import org.apache.beam.runners.core.MergingStateAccessor; import org.apache.beam.runners.core.StateAccessor; import org.apache.beam.runners.core.StateTag; @@ -79,7 +80,7 @@ public class TriggerStateMachineRunner<W extends BoundedWindow> { return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()); } - BitSet bitSet = state.read(); + @Nullable BitSet bitSet = state.read(); return bitSet == null ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()) : FinishedTriggersBitSet.fromBitSet(bitSet); http://git-wip-us.apache.org/repos/asf/beam/blob/524d8249/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java index 76d3e32..a4af6eb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.state; +import javax.annotation.Nonnull; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -31,6 +32,11 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; */ @Experimental(Kind.STATE) public interface BagState<T> extends GroupingState<T, Iterable<T>> { + + @Override + @Nonnull + Iterable<T> read(); + @Override BagState<T> readLater(); } http://git-wip-us.apache.org/repos/asf/beam/blob/524d8249/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java index 94a36d3..5cf4229 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.state; +import javax.annotation.Nonnull; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.transforms.Combine.CombineFn; @@ -35,6 +36,10 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; @Experimental(Kind.STATE) public interface CombiningState<InputT, AccumT, OutputT> extends GroupingState<InputT, OutputT> { + @Override + @Nonnull + OutputT read(); + /** * Read the merged accumulator for this state cell. It is implied that reading the state involves * reading the accumulator, so {@link #readLater} is sufficient to prefetch for this. http://git-wip-us.apache.org/repos/asf/beam/blob/524d8249/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java index f2774ba..dec064a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.state; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -37,10 +38,11 @@ public interface ReadableState<T> { * should first call {@link #readLater} for all of them so the reads can potentially be batched * (depending on the underlying implementation}. * - * <p>The returned object should be independent of the underlying state. Any direct modification + * <p>The returned object should be independent of the underlying state. Any direct modification * of the returned object should not modify state without going through the appropriate state * interface, and modification to the state should not be mirrored in the returned object. */ + @Nullable T read(); /** http://git-wip-us.apache.org/repos/asf/beam/blob/524d8249/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java index 4222304..360d9d3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java @@ -131,7 +131,7 @@ public class StateSpecs { * @see #bag(Coder) */ public static <T> StateSpec<BagState<T>> bag() { - return bag(null); + return new BagStateSpec<>(null); } /** @@ -151,7 +151,7 @@ public class StateSpecs { * @see #set(Coder) */ public static <T> StateSpec<SetState<T>> set() { - return set(null); + return new SetStateSpec<>(null); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/524d8249/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java index d8b8e92..01570f0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java @@ -19,4 +19,8 @@ /** * Classes and interfaces for interacting with state. */ +@DefaultAnnotation(NonNull.class) package org.apache.beam.sdk.state; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull;
