Repository: beam Updated Branches: refs/heads/master a5e6a0f12 -> b44e99b09
Revise javadoc for sdk, state, options, annotations, window, (and misc) Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/849f1225 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/849f1225 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/849f1225 Branch: refs/heads/master Commit: 849f12253973e8f7ff153a0ee4b11371cec5fc12 Parents: a5e6a0f Author: Kenneth Knowles <[email protected]> Authored: Mon May 8 21:28:37 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 9 15:26:22 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/beam/sdk/Pipeline.java | 57 +++++-------- .../org/apache/beam/sdk/PipelineResult.java | 11 ++- .../org/apache/beam/sdk/PipelineRunner.java | 10 +-- .../beam/sdk/annotations/Experimental.java | 25 +++--- .../apache/beam/sdk/annotations/Internal.java | 6 +- .../org/apache/beam/sdk/coders/AtomicCoder.java | 6 +- .../main/java/org/apache/beam/sdk/io/Read.java | 6 +- .../DefaultPipelineOptionsRegistrar.java | 3 +- .../beam/sdk/options/PipelineOptionSpec.java | 4 +- .../apache/beam/sdk/options/ValueProvider.java | 15 ++-- .../org/apache/beam/sdk/state/BagState.java | 6 +- .../apache/beam/sdk/state/CombiningState.java | 23 +++--- .../apache/beam/sdk/state/GroupingState.java | 17 ++-- .../org/apache/beam/sdk/state/MapState.java | 27 ++---- .../apache/beam/sdk/state/ReadableState.java | 16 ++-- .../org/apache/beam/sdk/state/SetState.java | 14 ++-- .../java/org/apache/beam/sdk/state/State.java | 6 +- .../org/apache/beam/sdk/state/StateBinder.java | 1 - .../org/apache/beam/sdk/state/StateContext.java | 1 - .../apache/beam/sdk/state/StateContexts.java | 5 +- .../org/apache/beam/sdk/state/StateSpecs.java | 86 ++++++++++++++------ .../org/apache/beam/sdk/state/TimeDomain.java | 8 +- .../java/org/apache/beam/sdk/state/Timer.java | 12 ++- .../org/apache/beam/sdk/state/TimerSpecs.java | 4 +- .../org/apache/beam/sdk/state/ValueState.java | 8 +- .../beam/sdk/state/WatermarkHoldState.java | 4 +- .../beam/sdk/transforms/windowing/AfterAll.java | 4 +- .../sdk/transforms/windowing/AfterFirst.java | 3 +- .../sdk/transforms/windowing/AfterPane.java | 3 +- .../windowing/AfterProcessingTime.java | 4 +- .../transforms/windowing/DefaultTrigger.java | 8 +- .../sdk/transforms/windowing/GlobalWindows.java | 5 +- .../transforms/windowing/InvalidWindows.java | 2 +- .../MergeOverlappingIntervalWindows.java | 6 +- .../beam/sdk/transforms/windowing/Never.java | 2 +- .../windowing/NonMergingWindowFn.java | 2 +- .../transforms/windowing/OrFinallyTrigger.java | 5 +- .../sdk/transforms/windowing/Repeatedly.java | 4 +- .../beam/sdk/transforms/windowing/Sessions.java | 13 +-- .../transforms/windowing/TimestampCombiner.java | 5 ++ .../beam/sdk/transforms/windowing/Window.java | 12 +-- .../beam/sdk/transforms/windowing/WindowFn.java | 10 +-- .../transforms/windowing/WindowMappingFn.java | 12 ++- .../java/org/apache/beam/sdk/values/PBegin.java | 26 +++--- 44 files changed, 271 insertions(+), 236 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index f4da6ad..83496a5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -53,19 +53,14 @@ import org.slf4j.LoggerFactory; /** * A {@link Pipeline} manages a directed acyclic graph of {@link PTransform PTransforms}, and the - * {@link PCollection PCollections} that the {@link PTransform}s consume and produce. - * - * <p>A {@link Pipeline} is initialized with a {@link PipelineRunner} that will later - * execute the {@link Pipeline}. - * - * <p>{@link Pipeline Pipelines} are independent, so they can be constructed and executed - * concurrently. + * {@link PCollection PCollections} that the {@link PTransform PTransforms} consume and produce. * * <p>Each {@link Pipeline} is self-contained and isolated from any other * {@link Pipeline}. The {@link PValue PValues} that are inputs and outputs of each of a * {@link Pipeline Pipeline's} {@link PTransform PTransforms} are also owned by that * {@link Pipeline}. A {@link PValue} owned by one {@link Pipeline} can be read only by - * {@link PTransform PTransforms} also owned by that {@link Pipeline}. + * {@link PTransform PTransforms} also owned by that {@link Pipeline}. {@link Pipeline Pipelines} + * can safely be executed concurrently. * * <p>Here is a typical example of use: * <pre> {@code @@ -130,9 +125,7 @@ public class Pipeline { // Public operations. /** - * Constructs a pipeline from default options. - * - * @return The newly created pipeline. + * Constructs a pipeline from default {@link PipelineOptions}. */ public static Pipeline create() { Pipeline pipeline = new Pipeline(PipelineOptionsFactory.create()); @@ -141,9 +134,7 @@ public class Pipeline { } /** - * Constructs a pipeline from the provided options. - * - * @return The newly created pipeline. + * Constructs a pipeline from the provided {@link PipelineOptions}. */ public static Pipeline create(PipelineOptions options) { // TODO: fix runners that mutate PipelineOptions in this method, then remove this line @@ -155,9 +146,8 @@ public class Pipeline { } /** - * Returns a {@link PBegin} owned by this Pipeline. This is useful - * as the input of a root PTransform such as {@link Read} or - * {@link Create}. + * Returns a {@link PBegin} owned by this Pipeline. This serves as the input of a root {@link + * PTransform} such as {@link Read} or {@link Create}. */ public PBegin begin() { return PBegin.in(this); @@ -175,12 +165,12 @@ public class Pipeline { } /** - * Adds a root {@link PTransform}, such as {@link Read} or {@link Create}, - * to this {@link Pipeline}. + * Adds a root {@link PTransform}, such as {@link Read} or {@link Create}, to this {@link + * Pipeline}. * - * <p>The node in the {@link Pipeline} graph will use the provided {@code name}. - * This name is used in various places, including the monitoring UI, logging, - * and to stably identify this node in the {@link Pipeline} graph upon update. + * <p>The node in the {@link Pipeline} graph will use the provided {@code name}. This name is used + * in various places, including the monitoring UI, logging, and to stably identify this node in + * the {@link Pipeline} graph upon update. * * <p>Alias for {@code begin().apply(name, root)}. */ @@ -284,19 +274,16 @@ public class Pipeline { } /** - * Runs this {@link Pipeline} using the default {@link PipelineOptions} provided - * to {@link #create(PipelineOptions)}. - * - * <p>It is an error to call this method if the pipeline was created without - * a default set of options. + * Runs this {@link Pipeline} according to the {@link PipelineOptions} used to create the {@link + * Pipeline} via {@link #create(PipelineOptions)}. */ public PipelineResult run() { return run(defaultOptions); } /** - * Runs this {@link Pipeline} using the given {@link PipelineOptions}, using the runner - * specified by the options. + * Runs this {@link Pipeline} using the given {@link PipelineOptions}, using the runner specified + * by the options. */ public PipelineResult run(PipelineOptions options) { PipelineRunner runner = PipelineRunner.fromOptions(options); @@ -316,9 +303,6 @@ public class Pipeline { } - ///////////////////////////////////////////////////////////////////////////// - // Below here are operations that aren't normally called by users. - /** * Returns the {@link CoderRegistry} that this {@link Pipeline} uses. */ @@ -329,9 +313,14 @@ public class Pipeline { return coderRegistry; } + ///////////////////////////////////////////////////////////////////////////// + // Below here are operations that aren't normally called by users. + /** - * Sets the {@link CoderRegistry} that this {@link Pipeline} uses. + * @deprecated this should never be used - every {@link Pipeline} has a registry throughout its + * lifetime. */ + @Deprecated public void setCoderRegistry(CoderRegistry coderRegistry) { this.coderRegistry = coderRegistry; } @@ -554,8 +543,6 @@ public class Pipeline { /** * Returns a unique name for a transform with the given prefix (from * enclosing transforms) and initial name. - * - * <p>For internal use only. */ private String uniquifyInternal(String namePrefix, String origName) { String name = origName; http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java index 7e78e6e..b60de63 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java @@ -25,6 +25,8 @@ import org.joda.time.Duration; /** * Result of {@link Pipeline#run()}. + * + * <p>This is often a job handle to an underlying data processing engine. */ public interface PipelineResult { @@ -65,7 +67,12 @@ public interface PipelineResult { // TODO: method to retrieve error messages. - /** Named constants for common values for the job state. */ + /** + * Possible job states, for both completed and ongoing jobs. + * + * <p>When determining if a job is still running, consult the {@link #isTerminal()} method rather + * than inspecting the precise state. + */ enum State { /** The job state could not be obtained or was not specified. */ @@ -114,7 +121,7 @@ public interface PipelineResult { } /** - * Return the object to access metrics from the pipeline. + * Returns the object to access metrics from the pipeline. * * @throws UnsupportedOperationException if the runner doesn't support retrieving metrics. */ http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java index 18e79eb..87705af 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java @@ -25,15 +25,14 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.util.InstanceBuilder; /** - * A {@link PipelineRunner} can execute, translate, or otherwise process a - * {@link Pipeline}. + * A {@link PipelineRunner} runs a {@link Pipeline}. * - * @param <ResultT> the type of the result of {@link #run}. + * @param <ResultT> the type of the result of {@link #run}, often a handle to a running job. */ public abstract class PipelineRunner<ResultT extends PipelineResult> { /** - * Constructs a runner from the provided options. + * Constructs a runner from the provided {@link PipelineOptions}. * * @return The newly created runner. */ @@ -55,7 +54,8 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> { } /** - * Processes the given Pipeline, returning the results. + * Processes the given {@link Pipeline}, potentially asynchronously, returning a runner-specific + * type of result. */ public abstract ResultT run(Pipeline pipeline); } http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java index 2e3a711..ac02465 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java @@ -24,25 +24,20 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** - * Signifies that a public API (public class, method or field) is subject to - * incompatible changes, or even removal, in a future release. An API bearing - * this annotation is exempt from any compatibility guarantees made by its - * containing library. Note that the presence of this annotation implies nothing - * about the quality or performance of the API in question, only the fact that - * it is not "API-frozen." + * Signifies that a public API (public class, method or field) is subject to incompatible changes, + * or even removal, in a future release. * - * <p>It is generally safe for <i>applications</i> to depend on experimental - * APIs, at the cost of some extra work during upgrades. However, it is - * generally inadvisable for <i>libraries</i> (which get included on users' - * class paths, outside the library developers' control) to do so. + * <p>Note that the presence of this annotation implies nothing about the quality or performance of + * the API in question, only the fact that the API or behavior may change in any way. */ @Retention(RetentionPolicy.CLASS) @Target({ - ElementType.ANNOTATION_TYPE, - ElementType.CONSTRUCTOR, - ElementType.FIELD, - ElementType.METHOD, - ElementType.TYPE}) + ElementType.ANNOTATION_TYPE, + ElementType.CONSTRUCTOR, + ElementType.FIELD, + ElementType.METHOD, + ElementType.TYPE +}) @Documented public @interface Experimental { Kind value() default Kind.UNSPECIFIED; http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Internal.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Internal.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Internal.java index c359ef6..b0dccad 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Internal.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Internal.java @@ -26,9 +26,9 @@ import java.lang.annotation.Target; /** * Signifies that a publicly accessible API (public class, method or field) is intended for internal - * use and not for public consumption. Such an API is subject to incompatible changes or removal at - * any time. An API bearing this annotation is exempt from any compatibility guarantees made by its - * containing library. + * use only and not for public consumption. + * + * <p>Such an API is subject to incompatible changes or removal at any time. */ @Retention(RetentionPolicy.CLASS) @Target({ http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java index 043fe93..7bcd532 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java @@ -22,12 +22,12 @@ import java.util.Collections; import java.util.List; /** - * A {@link Coder} that has no component {@link Coder Coders} or other state. + * A {@link Coder} that has no component {@link Coder Coders} or other configuration. * - * <p>Note that, unless the behavior is overridden, atomic coders are presumed to be deterministic. + * <p>Unless the behavior is overridden, atomic coders are presumed to be deterministic. * * <p>All atomic coders of the same class are considered to be equal to each other. As a result, - * an {@link AtomicCoder} should have no associated state. + * an {@link AtomicCoder} should have no associated configuration (instance variables, etc). * * @param <T> the type of the values being transcoded */ http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index dbd9b86..a07fca8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -49,15 +49,15 @@ public class Read { } /** - * Returns a new {@code Read.Unbounded} {@code PTransform} reading from the given - * {@code UnboundedSource}. + * Returns a new {@link Read.Unbounded} {@link PTransform} reading from the given + * {@link UnboundedSource}. */ public static <T> Unbounded<T> from(UnboundedSource<T, ?> source) { return new Unbounded<>(null, source); } /** - * Helper class for building {@code Read} transforms. + * Helper class for building {@link Read} transforms. */ public static class Builder { private final String name; http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java index b0ce812..3375dc7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java @@ -22,7 +22,8 @@ import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableList; /** - * A registrar containing the default SDK options. + * A {@link PipelineOptionsRegistrar} containing the {@link PipelineOptions} subclasses available by + * default. */ @AutoService(PipelineOptionsRegistrar.class) public class DefaultPipelineOptionsRegistrar implements PipelineOptionsRegistrar { http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java index 1220e6b..265c2ad 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java @@ -21,9 +21,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.auto.value.AutoValue; import java.lang.reflect.Method; -/** - * For internal use. Specification for an option defined in a {@link PipelineOptions} interface. - */ +/** Specification for an option defined in a {@link PipelineOptions} interface. */ @AutoValue abstract class PipelineOptionSpec { static PipelineOptionSpec of(Class<? extends PipelineOptions> clazz, String name, Method getter) { http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java index 030eed5..c7f1e09 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java @@ -40,13 +40,14 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.SerializableFunction; /** - * {@link ValueProvider} is an interface which abstracts the notion of - * fetching a value that may or may not be currently available. This can be - * used to parameterize transforms that only read values in at runtime, for - * example. + * A {@link ValueProvider} abstracts the notion of fetching a value that may or may not be currently + * available. + * + * <p>This can be used to parameterize transforms that only read values in at runtime, for example. */ @JsonSerialize(using = ValueProvider.Serializer.class) @JsonDeserialize(using = ValueProvider.Deserializer.class) @@ -264,8 +265,9 @@ public interface ValueProvider<T> extends Serializable { } /** - * Serializer for {@link ValueProvider}. + * <b>For internal use only; no backwards compatibility guarantees.</b> */ + @Internal class Serializer extends JsonSerializer<ValueProvider<?>> { @Override public void serialize(ValueProvider<?> value, JsonGenerator jgen, @@ -279,8 +281,9 @@ public interface ValueProvider<T> extends Serializable { } /** - * Deserializer for {@link ValueProvider}, which handles type marshalling. + * <b>For internal use only; no backwards compatibility guarantees.</b> */ + @Internal class Deserializer extends JsonDeserializer<ValueProvider<?>> implements ContextualDeserializer { http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java index 10fba05..76d3e32 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java @@ -21,7 +21,11 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; /** - * State containing a bag values. Items can be added to the bag and the contents read out. + * A {@link ReadableState} cell containing a bag of values. Items can be added to the bag and the + * contents read out. + * + * <p>Implementations of this form of state are expected to implement {@link #add} efficiently, not + * via a sequence of read-modify-write. * * @param <T> The type of elements in the bag. */ http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java index ddda255..94a36d3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java @@ -22,33 +22,32 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.transforms.Combine.CombineFn; /** - * State for a single value that is managed by a {@link CombineFn}. This is an internal extension - * to {@link GroupingState} that includes the {@code AccumT} type. + * A {@link ReadableState} cell defined by a {@link CombineFn}, accepting multiple input values, + * combining them as specified into accumulators, and producing a single output value. + * + * <p>Implementations of this form of state are expected to implement {@link #add} efficiently, not + * via a sequence of read-modify-write. * * @param <InputT> the type of values added to the state * @param <AccumT> the type of accumulator * @param <OutputT> the type of value extracted from the state */ @Experimental(Kind.STATE) -public interface CombiningState<InputT, AccumT, OutputT> - extends GroupingState<InputT, OutputT> { +public interface CombiningState<InputT, AccumT, OutputT> extends GroupingState<InputT, OutputT> { /** - * Read the merged accumulator for this combining value. It is implied that reading the - * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for - * this. + * Read the merged accumulator for this state cell. It is implied that reading the state involves + * reading the accumulator, so {@link #readLater} is sufficient to prefetch for this. */ AccumT getAccum(); /** - * Add an accumulator to this combining value. Depending on implementation this may immediately - * merge it with the previous accumulator, or may buffer this accumulator for a future merge. + * Add an accumulator to this state cell. Depending on implementation this may immediately merge + * it with the previous accumulator, or may buffer this accumulator for a future merge. */ void addAccum(AccumT accum); - /** - * Merge the given accumulators according to the underlying combiner. - */ + /** Merge the given accumulators according to the underlying {@link CombineFn}. */ AccumT mergeAccumulators(Iterable<AccumT> accumulators); @Override http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java index d99ff25..9c4c23e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java @@ -19,25 +19,24 @@ package org.apache.beam.sdk.state; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; /** - * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single - * {@code OutputT} value. + * A {@link ReadableState} cell that combines multiple input values and outputs a single value of a + * different type. + * + * <p>This generalizes {@link GroupByKey} and {@link Combine} styles of grouping. * * @param <InputT> the type of values added to the state * @param <OutputT> the type of value extracted from the state */ @Experimental(Kind.STATE) public interface GroupingState<InputT, OutputT> extends ReadableState<OutputT>, State { - /** - * Add a value to the buffer. - */ + /** Add a value to the buffer. */ void add(InputT value); - /** - * Return true if this state is empty. - */ + /** Return true if this state is empty. */ ReadableState<Boolean> isEmpty(); @Override http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java index 649c3c7..17ea332 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java @@ -22,9 +22,10 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; /** - * An object that maps keys to values. - * A map cannot contain duplicate keys; - * each key can map to at most one value. + * A {@link ReadableState} cell mapping keys to values. + * + * <p>Implementations of this form of state are expected to implement map operations efficiently + * as supported by some associated backing key-value store. * * @param <K> the type of keys maintained by this map * @param <V> the type of mapped values @@ -32,9 +33,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; @Experimental(Kind.STATE) public interface MapState<K, V> extends State { - /** - * Associates the specified value with the specified key in this state. - */ + /** Associates the specified value with the specified key in this state. */ void put(K key, V value); /** @@ -48,9 +47,7 @@ public interface MapState<K, V> extends State { */ ReadableState<V> putIfAbsent(K key, V value); - /** - * Removes the mapping for a key from this map if it is present. - */ + /** Remove the mapping for a key from this map if it is present. */ void remove(K key); /** @@ -64,19 +61,13 @@ public interface MapState<K, V> extends State { */ ReadableState<V> get(K key); - /** - * Returns a iterable view of the keys contained in this map. - */ + /** Returns an {@link Iterable} over the keys contained in this map. */ ReadableState<Iterable<K>> keys(); - /** - * Returns a iterable view of the values contained in this map. - */ + /** Returns an {@link Iterable} over the values contained in this map. */ ReadableState<Iterable<V>> values(); - /** - * Returns a iterable view of all key-values. - */ + /** Returns an {@link Iterable} over the key-value pairs contained in this map. */ ReadableState<Iterable<Map.Entry<K, V>>> entries(); } http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java index b29ab26..70703ce 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java @@ -21,10 +21,10 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; /** - * {@link State} that can be read via {@link #read()}. + * A {@link State} that can be read via {@link #read()}. * - * <p>Use {@link #readLater()} for marking several states for prefetching. Runners - * can potentially batch these into one read. + * <p>Use {@link #readLater()} for marking several states for prefetching. Runners can potentially + * batch these into one read. * * @param <T> The type of value returned by {@link #read}. */ @@ -33,17 +33,17 @@ public interface ReadableState<T> { /** * Read the current value, blocking until it is available. * - * <p>If there will be many calls to {@link #read} for different state in short succession, - * you should first call {@link #readLater} for all of them so the reads can potentially be - * batched (depending on the underlying implementation}. + * <p>If there will be many calls to {@link #read} for different state in short succession, you + * should first call {@link #readLater} for all of them so the reads can potentially be batched + * (depending on the underlying implementation}. */ T read(); /** * Indicate that the value will be read later. * - * <p>This allows an implementation to start an asynchronous prefetch or - * to include this state in the next batch of reads. + * <p>This allows an implementation to start an asynchronous prefetch or to include this state in + * the next batch of reads. * * @return this for convenient chaining */ http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java index cb9a0e6..fd339b2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java @@ -21,16 +21,16 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; /** - * State containing no duplicate elements. - * Items can be added to the set and the contents read out. + * A {@link ReadableState} cell containing a set of elements. + * + * <p>Implementations of this form of state are expected to implement set operations such as {@link + * #contains(Object)} efficiently, reading as little of the overall set as possible. * * @param <T> The type of elements in the set. */ @Experimental(Kind.STATE) public interface SetState<T> extends GroupingState<T, Iterable<T>> { - /** - * Returns true if this set contains the specified element. - */ + /** Returns true if this set contains the specified element. */ ReadableState<Boolean> contains(T t); /** @@ -39,9 +39,7 @@ public interface SetState<T> extends GroupingState<T, Iterable<T>> { */ ReadableState<Boolean> addIfAbsent(T t); - /** - * Removes the specified element from this set if it is present. - */ + /** Removes the specified element from this set if it is present. */ void remove(T t); @Override http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java index 0c0ca32..63a9420 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java @@ -21,10 +21,10 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; /** - * Base interface for all state locations. + * A state cell, supporting a {@link #clear()} operation. * - * <p>Specific types of state add appropriate accessors for reading and writing values, see - * {@link ValueState}, {@link BagState}, and {@link GroupingState}. + * <p>Specific types of state add appropriate accessors for reading and writing values, see {@link + * ValueState}, {@link BagState}, and {@link GroupingState}. */ @Experimental(Kind.STATE) public interface State { http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java index af834c3..29c7600 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; /** - * For internal use only; no backwards-compatibility guarantees. * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> * * <p>Visitor for binding a {@link StateSpec} and to the associated {@link State}. http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java index b7dd8e2..07bb05c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; /** - * For internal use only; no backwards-compatibility guarantees. * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> */ @Internal http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java index 2e21a27..d617079 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java @@ -22,10 +22,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; -/** - * For internal use only; no backwards-compatibility guarantees. - * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> - */ +/** <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> */ @Internal public class StateContexts { private static final StateContext<BoundedWindow> NULL_CONTEXT = http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java index 8a3c87e..7b71384 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java @@ -31,9 +31,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -/** - * Static utility methods for creating {@link StateSpec} instances. - */ +/** Static methods for working with {@link StateSpec StateSpecs}. */ @Experimental(Kind.STATE) public class StateSpecs { @@ -41,30 +39,50 @@ public class StateSpecs { private StateSpecs() {} - /** Create a simple state spec for values of type {@code T}. */ + /** + * Create a {@link StateSpec} for a single value of type {@code T}. + * + * <p>This method attempts to infer the accumulator coder automatically. + * + * @see #value(Coder) + */ public static <T> StateSpec<ValueState<T>> value() { return new ValueStateSpec<>(null); } - /** Create a simple state spec for values of type {@code T}. */ + /** + * Identical to {@link #value()}, but with a coder explicitly supplied. + * + * <p>If automatic coder inference fails, use this method. + */ public static <T> StateSpec<ValueState<T>> value(Coder<T> valueCoder) { checkArgument(valueCoder != null, "valueCoder should not be null. Consider value() instead"); return new ValueStateSpec<>(valueCoder); } /** - * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple - * {@code InputT}s into a single {@code OutputT}. + * Create a {@link StateSpec} for a {@link CombiningState} which uses a {@link CombineFn} to + * automatically merge multiple values of type {@code InputT} into a single resulting {@code + * OutputT}. + * + * <p>This method attempts to infer the accumulator coder automatically. + * + * @see #combining(Coder, CombineFn) */ public static <InputT, AccumT, OutputT> - StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( - CombineFn<InputT, AccumT, OutputT> combineFn) { + StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( + CombineFn<InputT, AccumT, OutputT> combineFn) { return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn); } /** - * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge - * multiple {@code InputT}s into a single {@code OutputT}. + * Create a {@link StateSpec} for a {@link CombiningState} which uses a {@link + * CombineFnWithContext} to automatically merge multiple values of type {@code InputT} into a + * single resulting {@code OutputT}. + * + * <p>This method attempts to infer the accumulator coder automatically. + * + * @see #combining(Coder, CombineFnWithContext) */ public static <InputT, AccumT, OutputT> StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( @@ -73,8 +91,9 @@ public class StateSpecs { } /** - * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple - * {@code InputT}s into a single {@code OutputT}. + * Identical to {@link #combining(CombineFn)}, but with an accumulator coder explicitly supplied. + * + * <p>If automatic coder inference fails, use this method. */ public static <InputT, AccumT, OutputT> StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( @@ -86,8 +105,10 @@ public class StateSpecs { } /** - * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge - * multiple {@code InputT}s into a single {@code OutputT}. + * Identical to {@link #combining(CombineFnWithContext)}, but with an accumulator coder explicitly + * supplied. + * + * <p>If automatic coder inference fails, use this method. */ public static <InputT, AccumT, OutputT> StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( @@ -96,43 +117,62 @@ public class StateSpecs { } /** - * Create a state spec that is optimized for adding values frequently, and occasionally retrieving - * all the values that have been added. + * Create a {@link StateSpec} for a {@link BagState}, optimized for adding values frequently + * and occasionally retrieving all the values that have been added. + * + * <p>This method attempts to infer the element coder automatically. + * + * @see #bag(Coder) */ public static <T> StateSpec<BagState<T>> bag() { return bag(null); } /** - * Create a state spec that is optimized for adding values frequently, and occasionally retrieving - * all the values that have been added. + * Identical to {@link #bag()}, but with an element coder explicitly supplied. + * + * <p>If automatic coder inference fails, use this method. */ public static <T> StateSpec<BagState<T>> bag(Coder<T> elemCoder) { return new BagStateSpec<>(elemCoder); } /** - * Create a state spec that supporting for {@link java.util.Set} like access patterns. + * Create a {@link StateSpec} for a {@link SetState}, optimized for checking membership. + * + * <p>This method attempts to infer the element coder automatically. + * + * @see #set(Coder) */ public static <T> StateSpec<SetState<T>> set() { return set(null); } /** - * Create a state spec that supporting for {@link java.util.Set} like access patterns. + * Identical to {@link #set()}, but with an element coder explicitly supplied. + * + * <p>If automatic coder inference fails, use this method. */ public static <T> StateSpec<SetState<T>> set(Coder<T> elemCoder) { return new SetStateSpec<>(elemCoder); } /** - * Create a state spec that supporting for {@link java.util.Map} like access patterns. + * Create a {@link StateSpec} for a {@link SetState}, optimized for key lookups and writes. + * + * <p>This method attempts to infer the key and value coders automatically. + * + * @see #map(Coder, Coder) */ public static <K, V> StateSpec<MapState<K, V>> map() { return new MapStateSpec<>(null, null); } - /** Create a state spec that supporting for {@link java.util.Map} like access patterns. */ + /** + * Identical to {@link #map()}, but with key and value coders explicitly supplied. + * + * <p>If automatic coder inference fails, use this method. + */ public static <K, V> StateSpec<MapState<K, V>> map(Coder<K> keyCoder, Coder<V> valueCoder) { return new MapStateSpec<>(keyCoder, valueCoder); } http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimeDomain.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimeDomain.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimeDomain.java index 0526453..1778047 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimeDomain.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimeDomain.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.state; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; /** * {@link TimeDomain} specifies whether an operation is based on timestamps of elements or current @@ -38,8 +39,11 @@ public enum TimeDomain { PROCESSING_TIME, /** - * Same as the {@link #PROCESSING_TIME} domain, except it won't fire a timer set for time <i>t</i> - * until all timers from earlier stages set for a time earlier than <i>t</i> have fired. + * <b>For internal use only; no backwards compatibility guarantees.</b> + * + * <p>Same as the {@link #PROCESSING_TIME} domain, except it won't fire a timer set for time + * <i>t</i> until all timers from earlier stages set for a time earlier than <i>t</i> have fired. */ + @Internal SYNCHRONIZED_PROCESSING_TIME } http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java index 9458906..6f98ff8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java @@ -48,8 +48,8 @@ public interface Timer { * was already set, resets it to the new requested time. * * <p>For {@link TimeDomain#PROCESSING_TIME}, the behavior is be unpredictable, since processing - * time timers are ignored after a window has expired. Instead, it is recommended to use - * {@link #setRelative()}. + * time timers are ignored after a window has expired. Instead, it is recommended to use {@link + * #setRelative()}. */ void set(Instant absoluteTime); @@ -65,14 +65,12 @@ public interface Timer { */ void setRelative(); - /** - * Set the align offset. - */ + /** Offsets the target timestamp used by {@link #setRelative()} by the given duration. */ Timer offset(Duration offset); /** - * Aligns a timestamp to the next boundary of {@code period}. + * Aligns the target timestamp used by {@link #setRelative()} to the next boundary of {@code + * period}. */ Timer align(Duration period); - } http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpecs.java index df42428..8b061eb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpecs.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerSpecs.java @@ -21,9 +21,7 @@ import com.google.auto.value.AutoValue; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -/** - * Static methods for working with {@link TimerSpec}. - */ +/** Static methods for working with {@link TimerSpec TimerSpecs}. */ @Experimental(Kind.TIMERS) public class TimerSpecs { http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java index ca97db2..0562c89 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java @@ -21,15 +21,13 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; /** - * State holding a single value. + * A {@link ReadableState} cell containing a single value. * - * @param <T> The type of values being stored. + * @param <T> The type of value being stored. */ @Experimental(Kind.STATE) public interface ValueState<T> extends ReadableState<T>, State { - /** - * Set the value of the buffer. - */ + /** Set the value. */ void write(T input); @Override http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java index 38e2cbc..6d4183d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java @@ -24,8 +24,8 @@ import org.joda.time.Instant; /** * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> * - * <p>A {@link State} accepting and aggregating output timestamps, which determines the time to - * which the output watermark must be held. + * <p>A {@link State} accepting and aggregating timestamps according to a {@link TimestampCombiner}, + * holding the output watermark to the resulting timestamp. */ @Internal public interface WatermarkHoldState extends GroupingState<Instant, Instant> { http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java index eb0a7ac..2e0c1f7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java @@ -27,9 +27,7 @@ import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.joda.time.Instant; -/** - * Create a {@link Trigger} that fires and finishes once after all of its sub-triggers have fired. - */ +/** A composite {@link Trigger} that fires when all of its sub-triggers are ready. */ @Experimental(Experimental.Kind.TRIGGER) public class AfterAll extends OnceTrigger { http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java index f0beb0a..f018019 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java @@ -27,8 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.joda.time.Instant; /** - * Create a composite {@link Trigger} that fires once after at least one of its sub-triggers have - * fired. + * A composite {@link Trigger} that fires once after at least one of its sub-triggers have fired. */ @Experimental(Experimental.Kind.TRIGGER) public class AfterFirst extends OnceTrigger { http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java index eade95d..5914beb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java @@ -24,7 +24,8 @@ import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.joda.time.Instant; /** - * {@link Trigger}s that fire based on properties of the elements in the current pane. + * A {@link Trigger} that fires at some point after a specified number of input elements have + * arrived. */ @Experimental(Experimental.Kind.TRIGGER) public class AfterPane extends OnceTrigger { http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java index cc7ec13..00e62be 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java @@ -30,8 +30,8 @@ import org.joda.time.format.PeriodFormat; import org.joda.time.format.PeriodFormatter; /** - * {@code AfterProcessingTime} triggers fire based on the current processing time. They operate in - * the real-time domain. + * A {@link Trigger} trigger that fires at a specified point in processing time, relative to when + * input first arrives. */ @Experimental(Experimental.Kind.TRIGGER) public class AfterProcessingTime extends OnceTrigger { http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java index 78f3735..ac00877 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java @@ -22,11 +22,13 @@ import org.apache.beam.sdk.annotations.Experimental; import org.joda.time.Instant; /** - * A trigger that is equivalent to {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. - * See {@link Repeatedly#forever} and {@link AfterWatermark#pastEndOfWindow} for more details. + * A trigger that is equivalent to {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. See + * {@link Repeatedly#forever} and {@link AfterWatermark#pastEndOfWindow} for more details. + * + * <p>This is a distinguished class to make it easy for runners to optimize for this common case. */ @Experimental(Experimental.Kind.TRIGGER) -public class DefaultTrigger extends Trigger{ +public class DefaultTrigger extends Trigger { private DefaultTrigger() { super(); http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java index 400be1f..bc3438b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java @@ -23,7 +23,10 @@ import org.apache.beam.sdk.coders.Coder; import org.joda.time.Instant; /** - * Default {@link WindowFn} that assigns all data to the same window. + * A {@link WindowFn} that assigns all data to the same window. + * + * <p>This is the {@link WindowFn} used for data coming from a source, before a + * {@link Window} transform has been applied. */ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> { http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java index 92041fc..49535a0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java @@ -36,7 +36,7 @@ public class InvalidWindows<W extends BoundedWindow> extends WindowFn<Object, W> } /** - * Returns the reason that this {@code WindowFn} is invalid. + * Returns the reason that this {@link WindowFn} is invalid. */ public String getCause() { return cause; http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java index 8314dd7..0a68021 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java @@ -21,10 +21,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import org.apache.beam.sdk.annotations.Internal; /** - * A utility function for merging overlapping {@link IntervalWindow}s. + * <b>For internal use only; no backwards compatibility guarantees.</b> + * + * <p>A utility function for merging overlapping {@link IntervalWindow IntervalWindows}. */ +@Internal public class MergeOverlappingIntervalWindows { /** http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java index 6dfeea7..b9acd24 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -23,7 +23,7 @@ import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.joda.time.Instant; /** - * A trigger which never fires. + * A {@link Trigger} which never fires. * * <p>Using this trigger will only produce output when the watermark passes the end of the * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness allowed http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/NonMergingWindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/NonMergingWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/NonMergingWindowFn.java index d304a68..5cedc6b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/NonMergingWindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/NonMergingWindowFn.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.transforms.windowing; /** - * Abstract base class for {@link WindowFn}s that do not merge windows. + * Abstract base class for {@link WindowFn WindowFns} that do not merge windows. * * @param <T> type of elements being windowed * @param <W> {@link BoundedWindow} subclass used to represent the windows used by this http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java index ad0de47..7eefb45 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java @@ -23,7 +23,10 @@ import java.util.List; import org.joda.time.Instant; /** - * Executes the {@code actual} trigger until it finishes or until the {@code until} trigger fires. + * A {@link Trigger} that executes according to its main trigger until its "finally" trigger fires. + * + * <p>Uniquely among triggers, the "finally" trigger's predicate is applied to all input seen so + * far, not input since the last firing. */ public class OrFinallyTrigger extends Trigger { http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java index 78b79c7..43951a3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java @@ -22,7 +22,7 @@ import java.util.List; import org.joda.time.Instant; /** - * Repeat a trigger, either until some condition is met or forever. + * A {@link Trigger} that fires according to its subtrigger forever. * * <p>For example, to fire after the end of the window, and every time late data arrives: * <pre> {@code @@ -31,6 +31,8 @@ import org.joda.time.Instant; * * <p>{@code Repeatedly.forever(someTrigger)} behaves like an infinite * {@code AfterEach.inOrder(someTrigger, someTrigger, someTrigger, ...)}. + * + * <p>You can use {@link #orFinally(OnceTrigger)} to let another trigger interrupt the repetition. */ public class Repeatedly extends Trigger { http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java index 5cc7c65..7390728 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java @@ -25,16 +25,17 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.joda.time.Duration; /** - * A {@link WindowFn} windowing values into sessions separated by {@link #gapDuration}-long - * periods with no elements. + * A {@link WindowFn} that windows values into sessions separated by periods with no input for at + * least the duration specified by {@link #getGapDuration()}. * - * <p>For example, in order to window data into session with at least 10 minute - * gaps in between them: - * <pre> {@code + * <p>For example, in order to window data into session with at least 10 minute gaps in between + * them: + * + * <pre>{@code * PCollection<Integer> pc = ...; * PCollection<Integer> windowed_pc = pc.apply( * Window.<Integer>into(Sessions.withGapDuration(Duration.standardMinutes(10)))); - * } </pre> + * }</pre> */ public class Sessions extends WindowFn<Object, IntervalWindow> { /** http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java index 39fe8a9..6cdcc3b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java @@ -24,10 +24,15 @@ import com.google.common.collect.Ordering; import java.util.Arrays; import java.util.Collections; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.GroupByKey; import org.joda.time.Instant; /** * Policies for combining timestamps that occur within a window. + * + * <p>In particular, these govern the timestamp on the output of a grouping transform + * such as {@link GroupByKey} or {@link Combine}. */ @Experimental(Experimental.Kind.OUTPUT_TIME) public enum TimestampCombiner { http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 2777191..dc4863b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -37,7 +37,7 @@ import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; import org.joda.time.Duration; /** - * {@code Window} logically divides up or groups the elements of a + * {@link Window} logically divides up or groups the elements of a * {@link PCollection} into finite windows according to a {@link WindowFn}. * The output of {@code Window} contains the same elements as input, but they * have been logically assigned to windows. The next @@ -50,16 +50,16 @@ import org.joda.time.Duration; * * <h2>Windowing</h2> * - * <p>Windowing a {@code PCollection} divides the elements into windows based + * <p>Windowing a {@link PCollection} divides the elements into windows based * on the associated event time for each element. This is especially useful - * for {@code PCollection}s with unbounded size, since it allows operating on - * a sub-group of the elements placed into a related window. For {@code PCollection}s + * for {@link PCollection PCollections} with unbounded size, since it allows operating on + * a sub-group of the elements placed into a related window. For {@link PCollection PCollections} * with a bounded size (aka. conventional batch mode), by default, all data is - * implicitly in a single window, unless {@code Window} is applied. + * implicitly in a single window, unless {@link Window} is applied. * * <p>For example, a simple form of windowing divides up the data into * fixed-width time intervals, using {@link FixedWindows}. - * The following example demonstrates how to use {@code Window} in a pipeline + * The following example demonstrates how to use {@link Window} in a pipeline * that counts the number of occurrences of strings each minute: * * <pre>{@code http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java index 5ebbb41..52ebd61 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java @@ -31,21 +31,21 @@ import org.joda.time.Instant; * The argument to the {@link Window} transform used to assign elements into * windows and to determine how windows are merged. See {@link Window} for more * information on how {@code WindowFn}s are used and for a library of - * predefined {@code WindowFn}s. + * predefined {@link WindowFn WindowFns}. * * <p>Users will generally want to use the predefined - * {@code WindowFn}s, but it is also possible to create new + * {@link WindowFn WindowFns}, but it is also possible to create new * subclasses. * - * <p>To create a custom {@code WindowFn}, inherit from this class and override all required + * <p>To create a custom {@link WindowFn}, inherit from this class and override all required * methods. If no merging is required, inherit from {@link NonMergingWindowFn} * instead. If no merging is required and each element is assigned to a single window, inherit from - * {@code PartitioningWindowFn}. Inheriting from the most specific subclass will enable more + * {@link PartitioningWindowFn}. Inheriting from the most specific subclass will enable more * optimizations in the runner. * * @param <T> type of elements being windowed * @param <W> {@link BoundedWindow} subclass used to represent the - * windows used by this {@code WindowFn} + * windows used by this {@link WindowFn} */ public abstract class WindowFn<T, W extends BoundedWindow> implements Serializable, HasDisplayData { http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java index c144aba..5da9db1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java @@ -19,15 +19,19 @@ package org.apache.beam.sdk.transforms.windowing; import java.io.Serializable; -import org.apache.beam.sdk.transforms.ParDo.MultiOutput; +import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Duration; /** - * A function that takes the windows of elements in a main input and maps them to the appropriate - * window in a {@link PCollectionView} consumed as a - * {@link MultiOutput#withSideInputs(PCollectionView[]) side input}. + * <b>Experimental! This will be ready for users eventually, but should be considered internal for + * now. No backwards compatibility guarantees.</b> + * + * <p>A function that takes the windows of elements in a main input and maps them to the appropriate + * window in a {@link PCollectionView} consumed as a {@link + * org.apache.beam.sdk.transforms.ParDo.MultiOutput#withSideInputs(PCollectionView[]) side input}. */ +@Experimental public abstract class WindowMappingFn<TargetWindowT extends BoundedWindow> implements Serializable { private final Duration maximumLookback; http://git-wip-us.apache.org/repos/asf/beam/blob/849f1225/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java index 04d1bdb..bf26e1b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PBegin.java @@ -21,37 +21,35 @@ import java.util.Collections; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO.Read; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; /** * {@link PBegin} is the "input" to a root {@link PTransform}, such as {@link Read Read} or - * {@link Create}. + * {@link org.apache.beam.sdk.transforms.Create}. * - * <p>Typically created by calling {@link Pipeline#begin} on a Pipeline. + * <p>Typically elided by simply calling {@link Pipeline#apply(String, PTransform)} or {@link + * Pipeline#apply(PTransform)}, but one can be explicitly created by calling {@link Pipeline#begin} + * on a Pipeline. */ public class PBegin implements PInput { - /** - * Returns a {@link PBegin} in the given {@link Pipeline}. - */ + /** Returns a {@link PBegin} in the given {@link Pipeline}. */ public static PBegin in(Pipeline pipeline) { return new PBegin(pipeline); } /** - * Like {@link #apply(String, PTransform)} but defaulting to the name - * of the {@link PTransform}. + * Like {@link #apply(String, PTransform)} but defaulting to the name of the {@link PTransform}. */ - public <OutputT extends POutput> OutputT apply( - PTransform<? super PBegin, OutputT> t) { + public <OutputT extends POutput> OutputT apply(PTransform<? super PBegin, OutputT> t) { return Pipeline.applyTransform(this, t); } /** - * Applies the given {@link PTransform} to this input {@link PBegin}, - * using {@code name} to identify this specific application of the transform. - * This name is used in various places, including the monitoring UI, logging, - * and to stably identify this application node in the job graph. + * Applies the given {@link PTransform} to this {@link PBegin}, using {@code name} to identify + * this specific application of the transform. + * + * <p>This name is used in various places, including the monitoring UI, logging, and to stably + * identify this application node in the job graph. */ public <OutputT extends POutput> OutputT apply( String name, PTransform<? super PBegin, OutputT> t) {
