Tighten access control and internal annotations for triggers
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/49cf433c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/49cf433c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/49cf433c Branch: refs/heads/master Commit: 49cf433c5c08f3cc91512aa9544a36a5d3e84333 Parents: c1b26a1 Author: Kenneth Knowles <[email protected]> Authored: Tue May 2 19:59:32 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu May 4 06:09:31 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/transforms/windowing/AfterAll.java | 4 +- .../sdk/transforms/windowing/AfterEach.java | 2 +- .../sdk/transforms/windowing/AfterFirst.java | 2 +- .../sdk/transforms/windowing/AfterPane.java | 2 +- .../windowing/AfterProcessingTime.java | 2 +- .../transforms/windowing/AfterWatermark.java | 4 +- .../transforms/windowing/DefaultTrigger.java | 2 +- .../beam/sdk/transforms/windowing/Never.java | 2 +- .../transforms/windowing/OrFinallyTrigger.java | 2 +- .../sdk/transforms/windowing/Repeatedly.java | 2 +- .../windowing/TimestampTransform.java | 41 ++++++++++++++++---- .../beam/sdk/transforms/windowing/Trigger.java | 18 +++++++-- 12 files changed, 62 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/49cf433c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java index 2747311..eb0a7ac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java @@ -23,6 +23,7 @@ import com.google.common.base.Joiner; import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.joda.time.Instant; @@ -51,6 +52,7 @@ public class AfterAll extends OnceTrigger { return new AfterAll(triggers); } + @Internal @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger will fire after the latest of its sub-triggers. @@ -65,7 +67,7 @@ public class AfterAll extends OnceTrigger { } @Override - public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) { + protected OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) { return new AfterAll(continuationTriggers); } http://git-wip-us.apache.org/repos/asf/beam/blob/49cf433c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java index 56a9d14..1fc4fbf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java @@ -72,7 +72,7 @@ public class AfterEach extends Trigger { } @Override - public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { + protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { return Repeatedly.forever(new AfterFirst(continuationTriggers)); } http://git-wip-us.apache.org/repos/asf/beam/blob/49cf433c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java index 79fd639..f0beb0a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java @@ -66,7 +66,7 @@ public class AfterFirst extends OnceTrigger { } @Override - public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) { + protected OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) { return new AfterFirst(continuationTriggers); } http://git-wip-us.apache.org/repos/asf/beam/blob/49cf433c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java index 25c5593..eade95d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java @@ -61,7 +61,7 @@ public class AfterPane extends OnceTrigger { } @Override - public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) { + protected OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) { return AfterPane.elementCountAtLeast(1); } http://git-wip-us.apache.org/repos/asf/beam/blob/49cf433c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java index eda269a..cc7ec13 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java @@ -40,7 +40,7 @@ public class AfterProcessingTime extends OnceTrigger { private final List<TimestampTransform> timestampTransforms; - public AfterProcessingTime(List<TimestampTransform> timestampTransforms) { + private AfterProcessingTime(List<TimestampTransform> timestampTransforms) { super(null); this.timestampTransforms = timestampTransforms; } http://git-wip-us.apache.org/repos/asf/beam/blob/49cf433c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java index 6825ab0..14a8c98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java @@ -88,7 +88,7 @@ public class AfterWatermark { } @SuppressWarnings("unchecked") - public AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) { + private AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) { super(lateTrigger == null ? ImmutableList.<Trigger>of(earlyTrigger) : ImmutableList.<Trigger>of(earlyTrigger, lateTrigger)); @@ -178,7 +178,7 @@ public class AfterWatermark { } @Override - public FromEndOfWindow getContinuationTrigger(List<Trigger> continuationTriggers) { + protected FromEndOfWindow getContinuationTrigger(List<Trigger> continuationTriggers) { return this; } http://git-wip-us.apache.org/repos/asf/beam/blob/49cf433c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java index a649b4f..78f3735 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java @@ -51,7 +51,7 @@ public class DefaultTrigger extends Trigger{ } @Override - public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { + protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { return this; } } http://git-wip-us.apache.org/repos/asf/beam/blob/49cf433c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java index 664ae83..6dfeea7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -43,7 +43,7 @@ public final class Never { * The actual trigger class for {@link Never} triggers. */ public static class NeverTrigger extends OnceTrigger { - protected NeverTrigger() { + private NeverTrigger() { super(null); } http://git-wip-us.apache.org/repos/asf/beam/blob/49cf433c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java index 1ed9b55..ad0de47 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java @@ -58,7 +58,7 @@ public class OrFinallyTrigger extends Trigger { } @Override - public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { + protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { // Use OrFinallyTrigger instead of AfterFirst because the continuation of ACTUAL // may not be a OnceTrigger. return Repeatedly.forever( http://git-wip-us.apache.org/repos/asf/beam/blob/49cf433c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java index 40591e3..78b79c7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java @@ -66,7 +66,7 @@ public class Repeatedly extends Trigger { } @Override - public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { + protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { return new Repeatedly(continuationTriggers.get(REPEATED)); } http://git-wip-us.apache.org/repos/asf/beam/blob/49cf433c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java index 5318592..8bdf6ee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java @@ -19,37 +19,59 @@ package org.apache.beam.sdk.transforms.windowing; import com.google.auto.value.AutoValue; import java.io.Serializable; +import org.apache.beam.sdk.annotations.Internal; import org.joda.time.Duration; import org.joda.time.Instant; -/** An abstract description of a standardized transformation on timestamps. */ +/** + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * + * <p>An abstract description of a standardized transformation on timestamps. + */ +@Internal public abstract class TimestampTransform implements Serializable{ - /** Returns a transform that shifts a timestamp later by {@code delay}. */ + TimestampTransform() {} + + /** + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * + * <p>Returns a transform that shifts a timestamp later by {@code delay}. + */ + @Internal public static TimestampTransform delay(Duration delay) { return new AutoValue_TimestampTransform_Delay(delay); } /** - * Returns a transform that aligns a timestamp to the next boundary of {@code period}, starting + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * + * <p>Returns a transform that aligns a timestamp to the next boundary of {@code period}, starting * from {@code offset}. */ + @Internal public static TimestampTransform alignTo(Duration period, Instant offset) { return new AutoValue_TimestampTransform_AlignTo(period, offset); } /** - * Returns a transform that aligns a timestamp to the next boundary of {@code period}, starting + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * + * <p>Returns a transform that aligns a timestamp to the next boundary of {@code period}, starting * from the start of the epoch. */ + @Internal public static TimestampTransform alignTo(Duration period) { return alignTo(period, new Instant(0)); } /** - * Represents the transform that aligns a timestamp to the next boundary of {@link #getPeriod()} - * start at {@link #getOffset()}. + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * + * <p>Represents the transform that aligns a timestamp to the next boundary of {@link + * #getPeriod()} start at {@link #getOffset()}. */ + @Internal @AutoValue public abstract static class AlignTo extends TimestampTransform { public abstract Duration getPeriod(); @@ -57,7 +79,12 @@ public abstract class TimestampTransform implements Serializable{ public abstract Instant getOffset(); } - /** Represents the transform that delays a timestamp by {@link #getDelay()}. */ + /** + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * + * <p>Represents the transform that delays a timestamp by {@link #getDelay()}. + */ + @Internal @AutoValue public abstract static class Delay extends TimestampTransform { public abstract Duration getDelay(); http://git-wip-us.apache.org/repos/asf/beam/blob/49cf433c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java index 07d3077..519ab67 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.GroupByKey; import org.joda.time.Instant; @@ -117,8 +118,10 @@ public abstract class Trigger implements Serializable { protected abstract Trigger getContinuationTrigger(List<Trigger> continuationTriggers); /** - * Returns a bound in event time by which this trigger would have fired at least once for a given - * window had there been input data. + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * + * <p>Returns a bound in event time by which this trigger would have fired at least once for a + * given window had there been input data. * * <p>For triggers that do not fire based on the watermark advancing, returns {@link * BoundedWindow#TIMESTAMP_MAX_VALUE}. @@ -126,9 +129,15 @@ public abstract class Trigger implements Serializable { * <p>This estimate may be used, for example, to determine that there are no elements in a * side-input window, which causes the default value to be used instead. */ + @Internal public abstract Instant getWatermarkThatGuaranteesFiring(BoundedWindow window); - /** Returns whether this performs the same triggering as the given {@link Trigger}. */ + /** + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * + * <p>Returns whether this performs the same triggering as the given {@link Trigger}. + */ + @Internal public boolean isCompatible(Trigger other) { if (!getClass().equals(other.getClass())) { return false; @@ -208,9 +217,12 @@ public abstract class Trigger implements Serializable { } /** + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * * {@link Trigger Triggers} that are guaranteed to fire at most once should extend {@link * OnceTrigger} rather than the general {@link Trigger} class to indicate that behavior. */ + @Internal public abstract static class OnceTrigger extends Trigger { protected OnceTrigger(List<Trigger> subTriggers) { super(subTriggers);
