Repository: incubator-beam Updated Branches: refs/heads/master 61b9d723d -> 1a060f684
Make WindowedValue like an interface, allow external implementations Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d835317c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d835317c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d835317c Branch: refs/heads/master Commit: d835317c230087a056fecaaff1d2e8ea8c910c23 Parents: bd21ead Author: Kenneth Knowles <[email protected]> Authored: Wed Jun 22 07:40:38 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Fri Jun 24 14:47:25 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/util/WindowedValue.java | 181 ++++++++++++------- 1 file changed, 112 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d835317c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 1bbdbd9..f63a0d4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -60,9 +60,6 @@ import java.util.Set; */ public abstract class WindowedValue<T> { - protected final T value; - protected final PaneInfo pane; - /** * Returns a {@code WindowedValue} with the given value, timestamp, * and windows. @@ -147,11 +144,6 @@ public abstract class WindowedValue<T> { return new ValueInEmptyWindows<T>(value, pane); } - private WindowedValue(T value, PaneInfo pane) { - this.value = value; - this.pane = checkNotNull(pane); - } - /** * Returns a new {@code WindowedValue} that is a copy of this one, but with a different value, * which may have a new type {@code NewT}. @@ -161,9 +153,7 @@ public abstract class WindowedValue<T> { /** * Returns the value of this {@code WindowedValue}. */ - public T getValue() { - return value; - } + public abstract T getValue(); /** * Returns the timestamp of this {@code WindowedValue}. @@ -176,6 +166,11 @@ public abstract class WindowedValue<T> { public abstract Collection<? extends BoundedWindow> getWindows(); /** + * Returns the pane of this {@code WindowedValue} in its window. + */ + public abstract PaneInfo getPane(); + + /** * Returns a collection of {@link WindowedValue WindowedValues} identical to this one, except each * is in exactly one of the windows that this {@link WindowedValue} is in. */ @@ -187,18 +182,28 @@ public abstract class WindowedValue<T> { return windowedValues.build(); } - /** - * Returns the pane of this {@code WindowedValue} in its window. - */ - public PaneInfo getPane() { - return pane; - } - @Override - public abstract boolean equals(Object o); + public boolean equals(Object other) { + if (!(other instanceof WindowedValue)) { + return false; + } else { + WindowedValue<?> that = (WindowedValue<?>) other; + + // Compare timestamps first as they are most likely to differ. + // Also compare timestamps according to millis-since-epoch because otherwise expensive + // comparisons are made on their Chronology objects. + return this.getTimestamp().isEqual(that.getTimestamp()) + && Objects.equals(this.getValue(), that.getValue()) + && Objects.equals(this.getWindows(), that.getWindows()) + && Objects.equals(this.getPane(), that.getPane()); + } + } @Override - public abstract int hashCode(); + public int hashCode() { + // Hash only the millis of the timestamp to be consistent with equals + return Objects.hash(getValue(), getTimestamp().getMillis(), getWindows(), getPane()); + } @Override public abstract String toString(); @@ -207,11 +212,34 @@ public abstract class WindowedValue<T> { Collections.singletonList(GlobalWindow.INSTANCE); /** + * An abstract superclass for implementations of {@link WindowedValue} that stores the value + * and pane info. + */ + private abstract static class SimpleWindowedValue<T> extends WindowedValue<T> { + private final T value; + private final PaneInfo pane; + + protected SimpleWindowedValue(T value, PaneInfo pane) { + this.value = value; + this.pane = checkNotNull(pane); + } + + @Override + public PaneInfo getPane() { + return pane; + } + @Override + public T getValue() { + return value; + } + } + + /** * The abstract superclass of WindowedValue representations where * timestamp == MIN. */ private abstract static class MinTimestampWindowedValue<T> - extends WindowedValue<T> { + extends SimpleWindowedValue<T> { public MinTimestampWindowedValue(T value, PaneInfo pane) { super(value, pane); } @@ -233,8 +261,8 @@ public abstract class WindowedValue<T> { } @Override - public <NewT> WindowedValue<NewT> withValue(NewT value) { - return new ValueInGlobalWindow<>(value, pane); + public <NewT> WindowedValue<NewT> withValue(NewT newValue) { + return new ValueInGlobalWindow<>(newValue, getPane()); } @Override @@ -246,23 +274,23 @@ public abstract class WindowedValue<T> { public boolean equals(Object o) { if (o instanceof ValueInGlobalWindow) { ValueInGlobalWindow<?> that = (ValueInGlobalWindow<?>) o; - return Objects.equals(that.pane, this.pane) - && Objects.equals(that.value, this.value); + return Objects.equals(that.getPane(), this.getPane()) + && Objects.equals(that.getValue(), this.getValue()); } else { - return false; + return super.equals(o); } } @Override public int hashCode() { - return Objects.hash(value, pane); + return Objects.hash(getValue(), getPane()); } @Override public String toString() { return MoreObjects.toStringHelper(getClass()) - .add("value", value) - .add("pane", pane) + .add("value", getValue()) + .add("pane", getPane()) .toString(); } } @@ -278,8 +306,8 @@ public abstract class WindowedValue<T> { } @Override - public <NewT> WindowedValue<NewT> withValue(NewT value) { - return new ValueInEmptyWindows<>(value, pane); + public <NewT> WindowedValue<NewT> withValue(NewT newValue) { + return new ValueInEmptyWindows<>(newValue, getPane()); } @Override @@ -291,23 +319,23 @@ public abstract class WindowedValue<T> { public boolean equals(Object o) { if (o instanceof ValueInEmptyWindows) { ValueInEmptyWindows<?> that = (ValueInEmptyWindows<?>) o; - return Objects.equals(that.pane, this.pane) - && Objects.equals(that.value, this.value); + return Objects.equals(that.getPane(), this.getPane()) + && Objects.equals(that.getValue(), this.getValue()); } else { - return false; + return super.equals(o); } } @Override public int hashCode() { - return Objects.hash(value, pane); + return Objects.hash(getValue(), getPane()); } @Override public String toString() { return MoreObjects.toStringHelper(getClass()) - .add("value", value) - .add("pane", pane) + .add("value", getValue()) + .add("pane", getPane()) .toString(); } } @@ -317,8 +345,8 @@ public abstract class WindowedValue<T> { * timestamp is arbitrary. */ private abstract static class TimestampedWindowedValue<T> - extends WindowedValue<T> { - protected final Instant timestamp; + extends SimpleWindowedValue<T> { + private final Instant timestamp; public TimestampedWindowedValue(T value, Instant timestamp, @@ -346,8 +374,8 @@ public abstract class WindowedValue<T> { } @Override - public <NewT> WindowedValue<NewT> withValue(NewT value) { - return new TimestampedValueInGlobalWindow<>(value, timestamp, pane); + public <NewT> WindowedValue<NewT> withValue(NewT newValue) { + return new TimestampedValueInGlobalWindow<>(newValue, getTimestamp(), getPane()); } @Override @@ -360,25 +388,29 @@ public abstract class WindowedValue<T> { if (o instanceof TimestampedValueInGlobalWindow) { TimestampedValueInGlobalWindow<?> that = (TimestampedValueInGlobalWindow<?>) o; - return this.timestamp.isEqual(that.timestamp) // don't compare chronology objects - && Objects.equals(that.pane, this.pane) - && Objects.equals(that.value, this.value); + // Compare timestamps first as they are most likely to differ. + // Also compare timestamps according to millis-since-epoch because otherwise expensive + // comparisons are made on their Chronology objects. + return this.getTimestamp().isEqual(that.getTimestamp()) + && Objects.equals(that.getPane(), this.getPane()) + && Objects.equals(that.getValue(), this.getValue()); } else { - return false; + return super.equals(o); } } @Override public int hashCode() { - return Objects.hash(value, pane, timestamp.getMillis()); + // Hash only the millis of the timestamp to be consistent with equals + return Objects.hash(getValue(), getPane(), getTimestamp().getMillis()); } @Override public String toString() { return MoreObjects.toStringHelper(getClass()) - .add("value", value) - .add("timestamp", timestamp) - .add("pane", pane) + .add("value", getValue()) + .add("timestamp", getTimestamp()) + .add("pane", getPane()) .toString(); } } @@ -400,8 +432,8 @@ public abstract class WindowedValue<T> { } @Override - public <NewT> WindowedValue<NewT> withValue(NewT value) { - return new TimestampedValueInSingleWindow<>(value, timestamp, window, pane); + public <NewT> WindowedValue<NewT> withValue(NewT newValue) { + return new TimestampedValueInSingleWindow<>(newValue, getTimestamp(), window, getPane()); } @Override @@ -414,27 +446,31 @@ public abstract class WindowedValue<T> { if (o instanceof TimestampedValueInSingleWindow) { TimestampedValueInSingleWindow<?> that = (TimestampedValueInSingleWindow<?>) o; - return Objects.equals(that.value, this.value) - && this.timestamp.isEqual(that.timestamp) // don't compare chronology objects - && Objects.equals(that.pane, this.pane) + // Compare timestamps first as they are most likely to differ. + // Also compare timestamps according to millis-since-epoch because otherwise expensive + // comparisons are made on their Chronology objects. + return this.getTimestamp().isEqual(that.getTimestamp()) + && Objects.equals(that.getValue(), this.getValue()) + && Objects.equals(that.getPane(), this.getPane()) && Objects.equals(that.window, this.window); } else { - return false; + return super.equals(o); } } @Override public int hashCode() { - return Objects.hash(value, timestamp.getMillis(), pane, window); + // Hash only the millis of the timestamp to be consistent with equals + return Objects.hash(getValue(), getTimestamp().getMillis(), getPane(), window); } @Override public String toString() { return MoreObjects.toStringHelper(getClass()) - .add("value", value) - .add("timestamp", timestamp) + .add("value", getValue()) + .add("timestamp", getTimestamp()) .add("window", window) - .add("pane", pane) + .add("pane", getPane()) .toString(); } } @@ -457,8 +493,8 @@ public abstract class WindowedValue<T> { } @Override - public <NewT> WindowedValue<NewT> withValue(NewT value) { - return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane); + public <NewT> WindowedValue<NewT> withValue(NewT newValue) { + return new TimestampedValueInMultipleWindows<>(newValue, getTimestamp(), windows, getPane()); } @Override @@ -471,30 +507,37 @@ public abstract class WindowedValue<T> { if (o instanceof TimestampedValueInMultipleWindows) { TimestampedValueInMultipleWindows<?> that = (TimestampedValueInMultipleWindows<?>) o; - if (this.timestamp.isEqual(that.timestamp) // don't compare chronology objects - && Objects.equals(that.value, this.value) - && Objects.equals(that.pane, this.pane)) { + // Compare timestamps first as they are most likely to differ. + // Also compare timestamps according to millis-since-epoch because otherwise expensive + // comparisons are made on their Chronology objects. + if (this.getTimestamp().isEqual(that.getTimestamp()) + && Objects.equals(that.getValue(), this.getValue()) + && Objects.equals(that.getPane(), this.getPane())) { ensureWindowsAreASet(); that.ensureWindowsAreASet(); return that.windows.equals(this.windows); + } else { + return false; } + } else { + return super.equals(o); } - return false; } @Override public int hashCode() { + // Hash only the millis of the timestamp to be consistent with equals ensureWindowsAreASet(); - return Objects.hash(value, timestamp.getMillis(), pane, windows); + return Objects.hash(getValue(), getTimestamp().getMillis(), getPane(), windows); } @Override public String toString() { return MoreObjects.toStringHelper(getClass()) - .add("value", value) - .add("timestamp", timestamp) + .add("value", getValue()) + .add("timestamp", getTimestamp()) .add("windows", windows) - .add("pane", pane) + .add("pane", getPane()) .toString(); }
