http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java deleted file mode 100644 index 48fa742..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.CombineWithContext; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; - -/** - * Visitor for binding a {@link StateSpec} and to the associated {@link State}. - */ -public interface StateBinder { - <T> ValueState<T> bindValue( - String id, StateSpec<ValueState<T>> spec, Coder<T> coder); - - <T> BagState<T> bindBag( - String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder); - - <T> SetState<T> bindSet( - String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder); - - <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( - String id, - StateSpec<MapState<KeyT, ValueT>> spec, - Coder<KeyT> mapKeyCoder, - Coder<ValueT> mapValueCoder); - - <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining( - String id, - StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - Combine.CombineFn<InputT, AccumT, OutputT> combineFn); - - <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext( - String id, - StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn); - - /** - * Bind to a watermark {@link StateSpec}. - * - * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added - * to the returned {@link WatermarkHoldState} are to be combined. - */ - WatermarkHoldState bindWatermark( - String id, - StateSpec<WatermarkHoldState> spec, - TimestampCombiner timestampCombiner); -}
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContext.java deleted file mode 100644 index 887a5f1..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContext.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.PCollectionView; - -/** - * Information accessible the state API. - */ -public interface StateContext<W extends BoundedWindow> { - /** - * Returns the {@code PipelineOptions} specified with the - * {@link org.apache.beam.sdk.runners.PipelineRunner}. - */ - PipelineOptions getPipelineOptions(); - - /** - * Returns the value of the side input for the corresponding state window. - */ - <T> T sideInput(PCollectionView<T> view); - - /** - * Returns the window corresponding to the state. - */ - W window(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java deleted file mode 100644 index 2ce9594..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateContexts.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.PCollectionView; - -/** - * Factory that produces {@link StateContext} based on different inputs. - */ -public class StateContexts { - private static final StateContext<BoundedWindow> NULL_CONTEXT = - new StateContext<BoundedWindow>() { - @Override - public PipelineOptions getPipelineOptions() { - throw new IllegalArgumentException("cannot call getPipelineOptions() in a null context"); - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - throw new IllegalArgumentException("cannot call sideInput() in a null context"); - } - - @Override - public BoundedWindow window() { - throw new IllegalArgumentException("cannot call window() in a null context"); - } - }; - - /** Returns a fake {@link StateContext}. */ - @SuppressWarnings("unchecked") - public static <W extends BoundedWindow> StateContext<W> nullContext() { - return (StateContext<W>) NULL_CONTEXT; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java deleted file mode 100644 index 8eda218..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import java.io.Serializable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.coders.Coder; - -/** - * A specification of a persistent state cell. This includes information necessary to encode the - * value and details about the intended access pattern. - * - * @param <StateT> The type of state being described. - */ -@Experimental(Kind.STATE) -public interface StateSpec<StateT extends State> extends Serializable { - - /** - * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address. - */ - StateT bind(String id, StateBinder binder); - - /** - * Given {code coders} are inferred from type arguments defined for this class. Coders which are - * already set should take precedence over offered coders. - * - * @param coders Array of coders indexed by the type arguments order. Entries might be null if the - * coder could not be inferred. - */ - void offerCoders(Coder[] coders); - - /** - * Validates that this {@link StateSpec} has been specified correctly and finalizes it. - * Automatically invoked when the pipeline is built. - */ - void finishSpecifying(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java deleted file mode 100644 index 49d5722..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java +++ /dev/null @@ -1,629 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.util.Objects; -import javax.annotation.Nullable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; - -/** - * Static utility methods for creating {@link StateSpec} instances. - */ -@Experimental(Kind.STATE) -public class StateSpecs { - - private static final CoderRegistry STANDARD_REGISTRY = CoderRegistry.createDefault(); - - private StateSpecs() {} - - /** Create a simple state spec for values of type {@code T}. */ - public static <T> StateSpec<ValueState<T>> value() { - return new ValueStateSpec<>(null); - } - - /** Create a simple state spec for values of type {@code T}. */ - public static <T> StateSpec<ValueState<T>> value(Coder<T> valueCoder) { - checkArgument(valueCoder != null, "valueCoder should not be null. Consider value() instead"); - return new ValueStateSpec<>(valueCoder); - } - - /** - * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple - * {@code InputT}s into a single {@code OutputT}. - */ - public static <InputT, AccumT, OutputT> - StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( - CombineFn<InputT, AccumT, OutputT> combineFn) { - return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn); - } - - /** - * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge - * multiple {@code InputT}s into a single {@code OutputT}. - */ - public static <InputT, AccumT, OutputT> - StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( - CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { - return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(null, combineFn); - } - - /** - * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple - * {@code InputT}s into a single {@code OutputT}. - */ - public static <InputT, AccumT, OutputT> - StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( - Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { - checkArgument(accumCoder != null, - "accumCoder should not be null. " - + "Consider using combining(CombineFn<> combineFn) instead."); - return combiningInternal(accumCoder, combineFn); - } - - /** - * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge - * multiple {@code InputT}s into a single {@code OutputT}. - */ - public static <InputT, AccumT, OutputT> - StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( - Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { - return combiningInternal(accumCoder, combineFn); - } - - /** - * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple - * {@code InputT}s into a single {@code OutputT}. - * - * <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and should - * only be used to initialize static values. - */ - public static <InputT, AccumT, OutputT> - StateSpec<CombiningState<InputT, AccumT, OutputT>> - combiningFromInputInternal( - Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { - try { - Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder); - return combiningInternal(accumCoder, combineFn); - } catch (CannotProvideCoderException e) { - throw new IllegalArgumentException( - "Unable to determine accumulator coder for " - + combineFn.getClass().getSimpleName() - + " from " - + inputCoder, - e); - } - } - - private static <InputT, AccumT, OutputT> - StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal( - Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { - return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); - } - - private static <InputT, AccumT, OutputT> - StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal( - Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { - return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); - } - - /** - * Create a state spec that is optimized for adding values frequently, and occasionally retrieving - * all the values that have been added. - */ - public static <T> StateSpec<BagState<T>> bag() { - return bag(null); - } - - /** - * Create a state spec that is optimized for adding values frequently, and occasionally retrieving - * all the values that have been added. - */ - public static <T> StateSpec<BagState<T>> bag(Coder<T> elemCoder) { - return new BagStateSpec<>(elemCoder); - } - - /** - * Create a state spec that supporting for {@link java.util.Set} like access patterns. - */ - public static <T> StateSpec<SetState<T>> set() { - return set(null); - } - - /** - * Create a state spec that supporting for {@link java.util.Set} like access patterns. - */ - public static <T> StateSpec<SetState<T>> set(Coder<T> elemCoder) { - return new SetStateSpec<>(elemCoder); - } - - /** - * Create a state spec that supporting for {@link java.util.Map} like access patterns. - */ - public static <K, V> StateSpec<MapState<K, V>> map() { - return new MapStateSpec<>(null, null); - } - - /** Create a state spec that supporting for {@link java.util.Map} like access patterns. */ - public static <K, V> StateSpec<MapState<K, V>> map(Coder<K> keyCoder, Coder<V> valueCoder) { - return new MapStateSpec<>(keyCoder, valueCoder); - } - - /** Create a state spec for holding the watermark. */ - public static - StateSpec<WatermarkHoldState> watermarkStateInternal( - TimestampCombiner timestampCombiner) { - return new WatermarkStateSpecInternal(timestampCombiner); - } - - public static <InputT, AccumT, OutputT> - StateSpec<BagState<AccumT>> convertToBagSpecInternal( - StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningSpec) { - if (combiningSpec instanceof CombiningStateSpec) { - // Checked above; conversion to a bag spec depends on the provided spec being one of those - // created via the factory methods in this class. - @SuppressWarnings("unchecked") - CombiningStateSpec<InputT, AccumT, OutputT> typedSpec = - (CombiningStateSpec<InputT, AccumT, OutputT>) combiningSpec; - return typedSpec.asBagSpec(); - } else if (combiningSpec instanceof CombiningWithContextStateSpec) { - @SuppressWarnings("unchecked") - CombiningWithContextStateSpec<InputT, AccumT, OutputT> typedSpec = - (CombiningWithContextStateSpec<InputT, AccumT, OutputT>) combiningSpec; - return typedSpec.asBagSpec(); - } else { - throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec); - } - } - - /** - * A specification for a state cell holding a settable value of type {@code T}. - * - * <p>Includes the coder for {@code T}. - */ - private static class ValueStateSpec<T> implements StateSpec<ValueState<T>> { - - @Nullable - private Coder<T> coder; - - private ValueStateSpec(@Nullable Coder<T> coder) { - this.coder = coder; - } - - @Override - public ValueState<T> bind(String id, StateBinder visitor) { - return visitor.bindValue(id, this, coder); - } - - @SuppressWarnings("unchecked") - @Override - public void offerCoders(Coder[] coders) { - if (this.coder == null) { - if (coders[0] != null) { - this.coder = (Coder<T>) coders[0]; - } - } - } - - @Override public void finishSpecifying() { - if (coder == null) { - throw new IllegalStateException("Unable to infer a coder for ValueState and no Coder" - + " was specified. Please set a coder by either invoking" - + " StateSpecs.value(Coder<T> valueCoder) or by registering the coder in the" - + " Pipeline's CoderRegistry."); - } - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof ValueStateSpec)) { - return false; - } - - ValueStateSpec<?> that = (ValueStateSpec<?>) obj; - return Objects.equals(this.coder, that.coder); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), coder); - } - } - - /** - * A specification for a state cell that is combined according to a {@link CombineFn}. - * - * <p>Includes the {@link CombineFn} and the coder for the accumulator type. - */ - private static class CombiningStateSpec<InputT, AccumT, OutputT> - implements StateSpec<CombiningState<InputT, AccumT, OutputT>> { - - @Nullable - private Coder<AccumT> accumCoder; - private final CombineFn<InputT, AccumT, OutputT> combineFn; - - private CombiningStateSpec( - @Nullable Coder<AccumT> accumCoder, - CombineFn<InputT, AccumT, OutputT> combineFn) { - this.combineFn = combineFn; - this.accumCoder = accumCoder; - } - - @Override - public CombiningState<InputT, AccumT, OutputT> bind( - String id, StateBinder visitor) { - return visitor.bindCombining(id, this, accumCoder, combineFn); - } - - @SuppressWarnings("unchecked") - @Override - public void offerCoders(Coder[] coders) { - if (this.accumCoder == null) { - if (coders[1] != null) { - this.accumCoder = (Coder<AccumT>) coders[1]; - } - } - } - - @Override public void finishSpecifying() { - if (accumCoder == null) { - throw new IllegalStateException("Unable to infer a coder for" - + " CombiningState and no Coder was specified." - + " Please set a coder by either invoking" - + " StateSpecs.combining(Coder<AccumT> accumCoder," - + " CombineFn<InputT, AccumT, OutputT> combineFn)" - + " or by registering the coder in the Pipeline's CoderRegistry."); - } - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof CombiningStateSpec)) { - return false; - } - - CombiningStateSpec<?, ?, ?> that = - (CombiningStateSpec<?, ?, ?>) obj; - return Objects.equals(this.accumCoder, that.accumCoder); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), accumCoder); - } - - private StateSpec<BagState<AccumT>> asBagSpec() { - return new BagStateSpec<AccumT>(accumCoder); - } - } - - /** - * A specification for a state cell that is combined according to a {@link - * CombineFnWithContext}. - * - * <p>Includes the {@link CombineFnWithContext} and the coder for the accumulator type. - */ - private static class CombiningWithContextStateSpec<InputT, AccumT, OutputT> - implements StateSpec<CombiningState<InputT, AccumT, OutputT>> { - - @Nullable private Coder<AccumT> accumCoder; - private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn; - - private CombiningWithContextStateSpec( - @Nullable Coder<AccumT> accumCoder, - CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { - this.combineFn = combineFn; - this.accumCoder = accumCoder; - } - - @Override - public CombiningState<InputT, AccumT, OutputT> bind( - String id, StateBinder visitor) { - return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn); - } - - @SuppressWarnings("unchecked") - @Override - public void offerCoders(Coder[] coders) { - if (this.accumCoder == null) { - if (coders[2] != null) { - this.accumCoder = (Coder<AccumT>) coders[2]; - } - } - } - - @Override - public void finishSpecifying() { - if (accumCoder == null) { - throw new IllegalStateException( - "Unable to infer a coder for" - + " CombiningWithContextState and no Coder was specified." - + " Please set a coder by either invoking" - + " StateSpecs.combiningWithcontext(Coder<AccumT> accumCoder," - + " CombineFnWithContext<InputT, AccumT, OutputT> combineFn)" - + " or by registering the coder in the Pipeline's CoderRegistry."); - } - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof CombiningWithContextStateSpec)) { - return false; - } - - CombiningWithContextStateSpec<?, ?, ?> that = (CombiningWithContextStateSpec<?, ?, ?>) obj; - return Objects.equals(this.accumCoder, that.accumCoder); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), accumCoder); - } - - private StateSpec<BagState<AccumT>> asBagSpec() { - return new BagStateSpec<AccumT>(accumCoder); - } - } - - /** - * A specification for a state cell supporting for bag-like access patterns - * (frequent additions, occasional reads of all the values). - * - * <p>Includes the coder for the element type {@code T}</p> - */ - private static class BagStateSpec<T> implements StateSpec<BagState<T>> { - - @Nullable - private Coder<T> elemCoder; - - private BagStateSpec(@Nullable Coder<T> elemCoder) { - this.elemCoder = elemCoder; - } - - @Override - public BagState<T> bind(String id, StateBinder visitor) { - return visitor.bindBag(id, this, elemCoder); - } - - @SuppressWarnings("unchecked") - @Override - public void offerCoders(Coder[] coders) { - if (this.elemCoder == null) { - if (coders[0] != null) { - this.elemCoder = (Coder<T>) coders[0]; - } - } - } - - @Override public void finishSpecifying() { - if (elemCoder == null) { - throw new IllegalStateException("Unable to infer a coder for BagState and no Coder" - + " was specified. Please set a coder by either invoking" - + " StateSpecs.bag(Coder<T> elemCoder) or by registering the coder in the" - + " Pipeline's CoderRegistry."); - } - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof BagStateSpec)) { - return false; - } - - BagStateSpec<?> that = (BagStateSpec<?>) obj; - return Objects.equals(this.elemCoder, that.elemCoder); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), elemCoder); - } - } - - private static class MapStateSpec<K, V> implements StateSpec<MapState<K, V>> { - - @Nullable - private Coder<K> keyCoder; - @Nullable - private Coder<V> valueCoder; - - private MapStateSpec(@Nullable Coder<K> keyCoder, @Nullable Coder<V> valueCoder) { - this.keyCoder = keyCoder; - this.valueCoder = valueCoder; - } - - @Override - public MapState<K, V> bind(String id, StateBinder visitor) { - return visitor.bindMap(id, this, keyCoder, valueCoder); - } - - @SuppressWarnings("unchecked") - @Override - public void offerCoders(Coder[] coders) { - if (this.keyCoder == null) { - if (coders[0] != null) { - this.keyCoder = (Coder<K>) coders[0]; - } - } - if (this.valueCoder == null) { - if (coders[1] != null) { - this.valueCoder = (Coder<V>) coders[1]; - } - } - } - - @Override public void finishSpecifying() { - if (keyCoder == null || valueCoder == null) { - throw new IllegalStateException("Unable to infer a coder for MapState and no Coder" - + " was specified. Please set a coder by either invoking" - + " StateSpecs.map(Coder<K> keyCoder, Coder<V> valueCoder) or by registering the" - + " coder in the Pipeline's CoderRegistry."); - } - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof MapStateSpec)) { - return false; - } - - MapStateSpec<?, ?> that = (MapStateSpec<?, ?>) obj; - return Objects.equals(this.keyCoder, that.keyCoder) - && Objects.equals(this.valueCoder, that.valueCoder); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), keyCoder, valueCoder); - } - } - - /** - * A specification for a state cell supporting for set-like access patterns. - * - * <p>Includes the coder for the element type {@code T}</p> - */ - private static class SetStateSpec<T> implements StateSpec<SetState<T>> { - - @Nullable - private Coder<T> elemCoder; - - private SetStateSpec(@Nullable Coder<T> elemCoder) { - this.elemCoder = elemCoder; - } - - @Override - public SetState<T> bind(String id, StateBinder visitor) { - return visitor.bindSet(id, this, elemCoder); - } - - @SuppressWarnings("unchecked") - @Override - public void offerCoders(Coder[] coders) { - if (this.elemCoder == null) { - if (coders[0] != null) { - this.elemCoder = (Coder<T>) coders[0]; - } - } - } - - @Override public void finishSpecifying() { - if (elemCoder == null) { - throw new IllegalStateException("Unable to infer a coder for SetState and no Coder" - + " was specified. Please set a coder by either invoking" - + " StateSpecs.set(Coder<T> elemCoder) or by registering the coder in the" - + " Pipeline's CoderRegistry."); - } - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof SetStateSpec)) { - return false; - } - - SetStateSpec<?> that = (SetStateSpec<?>) obj; - return Objects.equals(this.elemCoder, that.elemCoder); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), elemCoder); - } - } - - /** - * A specification for a state cell tracking a combined watermark hold. - * - * <p>Includes the {@link TimestampCombiner} according to which the output times - * are combined. - */ - private static class WatermarkStateSpecInternal implements StateSpec<WatermarkHoldState> { - - /** - * When multiple output times are added to hold the watermark, this determines how they are - * combined, and also the behavior when merging windows. Does not contribute to equality/hash - * since we have at most one watermark hold spec per computation. - */ - private final TimestampCombiner timestampCombiner; - - private WatermarkStateSpecInternal(TimestampCombiner timestampCombiner) { - this.timestampCombiner = timestampCombiner; - } - - @Override - public WatermarkHoldState bind(String id, StateBinder visitor) { - return visitor.bindWatermark(id, this, timestampCombiner); - } - - @Override - public void offerCoders(Coder[] coders) { - } - - @Override public void finishSpecifying() { - // Currently an empty implementation as there are no coders to validate. - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - // All instance of WatermarkHoldState are considered equal - return obj instanceof WatermarkStateSpecInternal; - } - - @Override - public int hashCode() { - return Objects.hash(getClass()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ValueState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ValueState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ValueState.java deleted file mode 100644 index b432203..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ValueState.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; - -/** - * State holding a single value. - * - * @param <T> The type of values being stored. - */ -@Experimental(Kind.STATE) -public interface ValueState<T> extends ReadableState<T>, State { - /** - * Set the value of the buffer. - */ - void write(T input); - - @Override - ValueState<T> readLater(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java deleted file mode 100644 index ae9b700..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.joda.time.Instant; - -/** - * A {@link State} accepting and aggregating output timestamps, which determines the time to which - * the output watermark must be held. - * - * <p><b><i>For internal use only. This API may change at any time.</i></b> - */ -@Experimental(Kind.STATE) -public interface WatermarkHoldState extends GroupingState<Instant, Instant> { - /** - * Return the {@link TimestampCombiner} which will be used to determine a watermark hold time - * given an element timestamp, and to combine watermarks from windows which are about to be - * merged. - */ - TimestampCombiner getTimestampCombiner(); - - @Override - WatermarkHoldState readLater(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/package-info.java deleted file mode 100644 index b9bec16..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Defines internal utilities for interacting with pipeline state. - */ -package org.apache.beam.sdk.util.state; http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 56051a6..1d41923 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -64,6 +64,13 @@ import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.SetState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -92,13 +99,6 @@ import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CombiningState; -import org.apache.beam.sdk.util.state.MapState; -import org.apache.beam.sdk.util.state.SetState; -import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateSpecs; -import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index fe96e87..13e46d5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -38,6 +38,9 @@ import java.util.List; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider; import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; @@ -50,9 +53,6 @@ import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateSpecs; -import org.apache.beam.sdk.util.state.ValueState; import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index f099d5d..27e0b89 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -29,6 +29,10 @@ import static org.junit.Assert.fail; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.state.WatermarkHoldState; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ProcessContextParameter; @@ -41,10 +45,6 @@ import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.TimerSpecs; -import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateSpecs; -import org.apache.beam.sdk.util.state.ValueState; -import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TypeDescriptor; import org.hamcrest.Matcher; http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java index 798e8dc..d16671b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java @@ -27,10 +27,10 @@ import java.io.ByteArrayOutputStream; import java.io.NotSerializableException; import java.io.ObjectOutputStream; import java.util.List; +import org.apache.beam.sdk.state.StateContexts; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.Context; import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.util.state.StateContexts; import org.junit.Before; import org.junit.Rule; import org.junit.Test;
