Expose internal constructors for TestStream events These are needed for deserialization from proto.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/47cea784 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/47cea784 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/47cea784 Branch: refs/heads/master Commit: 47cea78496a9a464d8cea7943a2f741c03692612 Parents: f5e30c5 Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 1 19:17:58 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Fri Jun 2 10:06:52 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/testing/TestStream.java | 60 ++++++++++++++------ 1 file changed, 44 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/47cea784/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java index d41b9ef..9ad8fd8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java @@ -24,8 +24,10 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableList; import java.util.List; +import java.util.Objects; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -86,8 +88,8 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> { /** * Adds the specified elements to the source with timestamp equal to the current watermark. * - * @return A {@link TestStream.Builder} like this one that will add the provided elements - * after all earlier events have completed. + * @return A {@link TestStream.Builder} like this one that will add the provided elements after + * all earlier events have completed. */ @SafeVarargs public final Builder<T> addElements(T element, T... elements) { @@ -103,8 +105,8 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> { /** * Adds the specified elements to the source with the provided timestamps. * - * @return A {@link TestStream.Builder} like this one that will add the provided elements - * after all earlier events have completed. + * @return A {@link TestStream.Builder} like this one that will add the provided elements after + * all earlier events have completed. */ @SafeVarargs public final Builder<T> addElements( @@ -136,7 +138,7 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> { * BoundedWindow#TIMESTAMP_MAX_VALUE} or beyond. * * @return A {@link TestStream.Builder} like this one that will advance the watermark to the - * specified point after all earlier events have completed. + * specified point after all earlier events have completed. */ public Builder<T> advanceWatermarkTo(Instant newWatermark) { checkArgument( @@ -146,10 +148,11 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> { "The Watermark cannot progress beyond the maximum. Got: %s. Maximum: %s", newWatermark, BoundedWindow.TIMESTAMP_MAX_VALUE); - ImmutableList<Event<T>> newEvents = ImmutableList.<Event<T>>builder() - .addAll(events) - .add(WatermarkEvent.<T>advanceTo(newWatermark)) - .build(); + ImmutableList<Event<T>> newEvents = + ImmutableList.<Event<T>>builder() + .addAll(events) + .add(WatermarkEvent.<T>advanceTo(newWatermark)) + .build(); return new Builder<T>(coder, newEvents, newWatermark); } @@ -157,7 +160,7 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> { * Advance the processing time by the specified amount. * * @return A {@link TestStream.Builder} like this one that will advance the processing time by - * the specified amount after all earlier events have completed. + * the specified amount after all earlier events have completed. */ public Builder<T> advanceProcessingTime(Duration amount) { checkArgument( @@ -194,9 +197,7 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> { EventType getType(); } - /** - * The types of {@link Event} that are supported by {@link TestStream}. - */ + /** The types of {@link Event} that are supported by {@link TestStream}. */ public enum EventType { ELEMENT, WATERMARK, @@ -213,7 +214,11 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> { return add(ImmutableList.<TimestampedValue<T>>builder().add(element).add(elements).build()); } - static <T> Event<T> add(Iterable<TimestampedValue<T>> elements) { + /** + * <b>For internal use only: no backwards compatibility guarantees.</b> + */ + @Internal + public static <T> Event<T> add(Iterable<TimestampedValue<T>> elements) { return new AutoValue_TestStream_ElementEvent<>(EventType.ELEMENT, elements); } } @@ -223,7 +228,11 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> { public abstract static class WatermarkEvent<T> implements Event<T> { public abstract Instant getWatermark(); - static <T> Event<T> advanceTo(Instant newWatermark) { + /** + * <b>For internal use only: no backwards compatibility guarantees.</b> + */ + @Internal + public static <T> Event<T> advanceTo(Instant newWatermark) { return new AutoValue_TestStream_WatermarkEvent<>(EventType.WATERMARK, newWatermark); } } @@ -233,7 +242,11 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> { public abstract static class ProcessingTimeEvent<T> implements Event<T> { public abstract Duration getProcessingTimeAdvance(); - static <T> Event<T> advanceBy(Duration amount) { + /** + * <b>For internal use only: no backwards compatibility guarantees.</b> + */ + @Internal + public static <T> Event<T> advanceBy(Duration amount) { return new AutoValue_TestStream_ProcessingTimeEvent<>(EventType.PROCESSING_TIME, amount); } } @@ -257,4 +270,19 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> { public List<Event<T>> getEvents() { return events; } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TestStream)) { + return false; + } + TestStream<?> that = (TestStream<?>) other; + + return getValueCoder().equals(that.getValueCoder()) && getEvents().equals(that.getEvents()); + } + + @Override + public int hashCode() { + return Objects.hash(TestStream.class, getValueCoder(), getEvents()); + } }
