Make TimerData#compareTo consistent with equals Timers are equal if the domain, timestamp, and namespace are equal. Compare these values in compareTo. The ordering of TimerData that are not in the same namespace or domain is arbitrary.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9fe6ce22 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9fe6ce22 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9fe6ce22 Branch: refs/heads/gearpump-runner Commit: 9fe6ce22ca12b48704deb0e7cf3c583dff9b1870 Parents: fc87a0c Author: Thomas Groh <[email protected]> Authored: Wed Aug 10 13:52:14 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Wed Aug 10 13:55:54 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/util/TimerInternals.java | 15 ++++++- .../beam/sdk/util/TimerInternalsTest.java | 47 ++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9fe6ce22/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java index 3212d64..eb49b9d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import com.google.common.base.MoreObjects; +import com.google.common.collect.ComparisonChain; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -205,9 +206,21 @@ public interface TimerInternals { .toString(); } + /** + * {@inheritDoc}. + * + * <p>The ordering of {@link TimerData} that are not in the same namespace or domain is + * arbitrary. + */ @Override public int compareTo(TimerData o) { - return Long.compare(timestamp.getMillis(), o.getTimestamp().getMillis()); + ComparisonChain chain = + ComparisonChain.start().compare(timestamp, o.getTimestamp()).compare(domain, o.domain); + if (chain.result() == 0) { + // Obtaining the stringKey may be expensive; only do so if required + chain = chain.compare(namespace.stringKey(), o.namespace.stringKey()); + } + return chain.result(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9fe6ce22/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java index baf911a..bc2930c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java @@ -17,12 +17,18 @@ */ package org.apache.beam.sdk.util; +import static org.hamcrest.Matchers.comparesEqualTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.TimerInternals.TimerDataCoder; +import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.joda.time.Instant; @@ -50,4 +56,45 @@ public class TimerInternalsTest { windowCoder, new IntervalWindow(new Instant(0), new Instant(100))), new Instant(99), TimeDomain.PROCESSING_TIME)); } + + @Test + public void testCompareTo() { + Instant firstTimestamp = new Instant(100); + Instant secondTimestamp = new Instant(200); + IntervalWindow firstWindow = new IntervalWindow(new Instant(0), firstTimestamp); + IntervalWindow secondWindow = new IntervalWindow(firstTimestamp, secondTimestamp); + Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); + + StateNamespace firstWindowNs = StateNamespaces.window(windowCoder, firstWindow); + StateNamespace secondWindowNs = StateNamespaces.window(windowCoder, secondWindow); + + TimerData firstEventTime = TimerData.of(firstWindowNs, firstTimestamp, TimeDomain.EVENT_TIME); + TimerData secondEventTime = TimerData.of(firstWindowNs, secondTimestamp, TimeDomain.EVENT_TIME); + TimerData thirdEventTime = TimerData.of(secondWindowNs, secondTimestamp, TimeDomain.EVENT_TIME); + + TimerData firstProcTime = + TimerData.of(firstWindowNs, firstTimestamp, TimeDomain.PROCESSING_TIME); + TimerData secondProcTime = + TimerData.of(firstWindowNs, secondTimestamp, TimeDomain.PROCESSING_TIME); + TimerData thirdProcTime = + TimerData.of(secondWindowNs, secondTimestamp, TimeDomain.PROCESSING_TIME); + + assertThat(firstEventTime, + comparesEqualTo(TimerData.of(firstWindowNs, firstTimestamp, TimeDomain.EVENT_TIME))); + assertThat(firstEventTime, lessThan(secondEventTime)); + assertThat(secondEventTime, lessThan(thirdEventTime)); + assertThat(firstEventTime, lessThan(thirdEventTime)); + + assertThat(secondProcTime, + comparesEqualTo(TimerData.of(firstWindowNs, secondTimestamp, TimeDomain.PROCESSING_TIME))); + assertThat(firstProcTime, lessThan(secondProcTime)); + assertThat(secondProcTime, lessThan(thirdProcTime)); + assertThat(firstProcTime, lessThan(thirdProcTime)); + + assertThat(firstEventTime, not(comparesEqualTo(firstProcTime))); + assertThat(firstProcTime, + not(comparesEqualTo(TimerData.of(firstWindowNs, + firstTimestamp, + TimeDomain.SYNCHRONIZED_PROCESSING_TIME)))); + } }
