Add Latest CombineFn and PTransforms Add DoFnTester support for specifying input timestamps
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6ee7b620 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6ee7b620 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6ee7b620 Branch: refs/heads/gearpump-runner Commit: 6ee7b620bf8e2ee07c0f30e9ff20363e79765405 Parents: 28ad44d Author: Scott Wegner <sweg...@google.com> Authored: Thu Aug 18 13:56:34 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Sep 12 17:40:11 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/coders/NullableCoder.java | 7 + .../apache/beam/sdk/transforms/DoFnTester.java | 33 ++- .../org/apache/beam/sdk/transforms/Latest.java | 203 ++++++++++++++++ .../beam/sdk/values/TimestampedValue.java | 14 ++ .../beam/sdk/transforms/DoFnTesterTest.java | 34 ++- .../beam/sdk/transforms/LatestFnTests.java | 233 +++++++++++++++++++ .../apache/beam/sdk/transforms/LatestTest.java | 146 ++++++++++++ .../beam/sdk/values/TimestampedValueTest.java | 83 +++++++ 8 files changed, 747 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java index 44aadbd..9c6c7c0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java @@ -65,6 +65,13 @@ public class NullableCoder<T> extends StandardCoder<T> { this.valueCoder = valueCoder; } + /** + * Returns the inner {@link Coder} wrapped by this {@link NullableCoder} instance. + */ + public Coder<T> getValueCoder() { + return valueCoder; + } + @Override public void encode(@Nullable T value, OutputStream outStream, Context context) throws IOException, CoderException { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index b867a55..0e018ba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.transforms; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; @@ -221,9 +224,26 @@ public class DoFnTester<InputT, OutputT> { * been finished */ public void processElement(InputT element) throws Exception { - if (state == State.FINISHED) { - throw new IllegalStateException("finishBundle() has already been called"); - } + processTimestampedElement(TimestampedValue.atMinimumTimestamp(element)); + } + + /** + * Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a + * context where {@link OldDoFn.ProcessContext#element} returns the + * given element and timestamp. + * + * <p>Will call {@link #startBundle} automatically, if it hasn't + * already been called. + * + * <p>If the input timestamp is {@literal null}, the minimum timestamp will be used. + * + * @throws IllegalStateException if the {@code OldDoFn} under test has already + * been finished + */ + public void processTimestampedElement(TimestampedValue<InputT> element) throws Exception { + checkNotNull(element, "Timestamped element cannot be null"); + checkState(state != State.FINISHED, "finishBundle() has already been called"); + if (state == State.UNSTARTED) { startBundle(); } @@ -522,10 +542,13 @@ public class DoFnTester<InputT, OutputT> { private TestProcessContext<InputT, OutputT> createProcessContext( OldDoFn<InputT, OutputT> fn, - InputT elem) { + TimestampedValue<InputT> elem) { + WindowedValue<InputT> windowedValue = WindowedValue.timestampedValueInGlobalWindow( + elem.getValue(), elem.getTimestamp()); + return new TestProcessContext<>(fn, createContext(fn), - WindowedValue.valueInGlobalWindow(elem), + windowedValue, mainOutputTag, sideInputs); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java new file mode 100644 index 0000000..7f13649 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.util.Iterator; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; + +/** + * {@link PTransform} and {@link Combine.CombineFn} for computing the latest element + * in a {@link PCollection}. + * + * <p>Example 1: compute the latest value for each session: + * <pre><code> + * PCollection<Long> input = ...; + * PCollection<Long> sessioned = input + * .apply(Window.<Long>into(Sessions.withGapDuration(Duration.standardMinutes(5))); + * PCollection<Long> latestValues = sessioned.apply(Latest.<Long>globally()); + * </code></pre> + * + * <p>Example 2: track a latest computed value in an aggregator: + * <pre><code> + * class MyDoFn extends DoFn<String, String> { + * private Aggregator<TimestampedValue<Double>, Double> latestValue = + * createAggregator("latestValue", new Latest.LatestFn<Double>()); + * + * {@literal @}ProcessElement + * public void processElement(ProcessContext c) { + * double val = // .. + * latestValue.addValue(TimestampedValue.of(val, c.timestamp())); + * // .. + * } + * } + * </code></pre> + * + * <p>For elements with the same timestamp, the element chosen for output is arbitrary. + */ +public class Latest { + // Do not instantiate + private Latest() {} + + /** + * A {@link Combine.CombineFn} that computes the latest element from a set of inputs. This is + * particularly useful as an {@link Aggregator}. + * + * @param <T> Type of input element. + * @see Latest + */ + public static class LatestFn<T> + extends Combine.CombineFn<TimestampedValue<T>, TimestampedValue<T>, T> { + /** Construct a new {@link LatestFn} instance. */ + public LatestFn() {} + + @Override + public TimestampedValue<T> createAccumulator() { + return TimestampedValue.atMinimumTimestamp(null); + } + + @Override + public TimestampedValue<T> addInput(TimestampedValue<T> accumulator, + TimestampedValue<T> input) { + checkNotNull(accumulator, "accumulator must be non-null"); + checkNotNull(input, "input must be non-null"); + + if (input.getTimestamp().isBefore(accumulator.getTimestamp())) { + return accumulator; + } else { + return input; + } + } + + @Override + public Coder<TimestampedValue<T>> getAccumulatorCoder(CoderRegistry registry, + Coder<TimestampedValue<T>> inputCoder) throws CannotProvideCoderException { + return NullableCoder.of(inputCoder); + } + + @Override + public Coder<T> getDefaultOutputCoder(CoderRegistry registry, + Coder<TimestampedValue<T>> inputCoder) throws CannotProvideCoderException { + checkState(inputCoder instanceof TimestampedValue.TimestampedValueCoder, + "inputCoder must be a TimestampedValueCoder, but was %s", inputCoder); + + TimestampedValue.TimestampedValueCoder<T> inputTVCoder = + (TimestampedValue.TimestampedValueCoder<T>) inputCoder; + return NullableCoder.of(inputTVCoder.<T>getValueCoder()); + } + + @Override + public TimestampedValue<T> mergeAccumulators(Iterable<TimestampedValue<T>> accumulators) { + checkNotNull(accumulators, "accumulators must be non-null"); + + Iterator<TimestampedValue<T>> iter = accumulators.iterator(); + if (!iter.hasNext()) { + return createAccumulator(); + } + + TimestampedValue<T> merged = iter.next(); + while (iter.hasNext()) { + merged = addInput(merged, iter.next()); + } + + return merged; + } + + @Override + public T extractOutput(TimestampedValue<T> accumulator) { + return accumulator.getValue(); + } + } + + /** + * Returns a {@link PTransform} that takes as input a {@link PCollection<T>} and returns a + * {@link PCollection<T>} whose contents is the latest element according to its event time, or + * {@literal null} if there are no elements. + * + * @param <T> The type of the elements being combined. + */ + public static <T> PTransform<PCollection<T>, PCollection<T>> globally() { + return new Globally<>(); + } + + /** + * Returns a {@link PTransform} that takes as input a {@code PCollection<KV<K, V>>} and returns a + * {@code PCollection<KV<K, V>>} whose contents is the latest element per-key according to its + * event time. + * + * @param <K> The key type of the elements being combined. + * @param <V> The value type of the elements being combined. + */ + public static <K, V> PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> perKey() { + return new PerKey<>(); + } + + private static class Globally<T> extends PTransform<PCollection<T>, PCollection<T>> { + @Override + public PCollection<T> apply(PCollection<T> input) { + Coder<T> inputCoder = input.getCoder(); + + return input + .apply("Reify Timestamps", ParDo.of( + new DoFn<T, TimestampedValue<T>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(TimestampedValue.of(c.element(), c.timestamp())); + } + })).setCoder(TimestampedValue.TimestampedValueCoder.of(inputCoder)) + .apply("Latest Value", Combine.globally(new LatestFn<T>())) + .setCoder(NullableCoder.of(inputCoder)); + } + } + + private static class PerKey<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> { + @Override + public PCollection<KV<K, V>> apply(PCollection<KV<K, V>> input) { + checkNotNull(input); + checkArgument(input.getCoder() instanceof KvCoder, + "Input specifiedCoder must be an instance of KvCoder, but was %s", input.getCoder()); + + @SuppressWarnings("unchecked") + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + return input + .apply("Reify Timestamps", ParDo.of( + new DoFn<KV<K, V>, KV<K, TimestampedValue<V>>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(KV.of(c.element().getKey(), TimestampedValue.of(c.element().getValue(), + c.timestamp()))); + } + })).setCoder(KvCoder.of( + inputCoder.getKeyCoder(), + TimestampedValue.TimestampedValueCoder.of(inputCoder.getValueCoder()))) + .apply("Latest Value", Combine.<K, TimestampedValue<V>, V>perKey(new LatestFn<V>())) + .setCoder(inputCoder); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java index f2ad616..dd80fb2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java @@ -31,6 +31,7 @@ import java.util.Objects; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.PropertyNames; import org.joda.time.Instant; @@ -43,6 +44,13 @@ import org.joda.time.Instant; * @param <V> the type of the value */ public class TimestampedValue<V> { + /** + * Returns a new {@link TimestampedValue} with the + * {@link BoundedWindow#TIMESTAMP_MIN_VALUE minimum timestamp}. + */ + public static <V> TimestampedValue<V> atMinimumTimestamp(V value) { + return of(value, BoundedWindow.TIMESTAMP_MIN_VALUE); + } /** * Returns a new {@code TimestampedValue} with the given value and timestamp. @@ -136,6 +144,10 @@ public class TimestampedValue<V> { return Arrays.<Coder<?>>asList(valueCoder); } + public Coder<T> getValueCoder() { + return valueCoder; + } + public static <T> List<Object> getInstanceComponents(TimestampedValue<T> exampleValue) { return Arrays.<Object>asList(exampleValue.getValue()); } @@ -147,6 +159,8 @@ public class TimestampedValue<V> { private final Instant timestamp; protected TimestampedValue(V value, Instant timestamp) { + checkNotNull(timestamp, "timestamp must be non-null"); + this.value = value; this.timestamp = timestamp; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index 2649be5..3ed30fd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.transforms; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItems; @@ -35,7 +36,9 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.hamcrest.Matchers; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -44,6 +47,7 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class DoFnTesterTest { + @Rule public ExpectedException thrown = ExpectedException.none(); @Test public void processElement() throws Exception { @@ -126,6 +130,16 @@ public class DoFnTesterTest { } @Test + public void processElementAfterFinish() throws Exception { + DoFnTester<Long, String> tester = DoFnTester.of(new CounterDoFn()); + tester.finishBundle(); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("finishBundle() has already been called"); + tester.processElement(1L); + } + + @Test public void processBatch() throws Exception { CounterDoFn counterDoFn = new CounterDoFn(); DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn); @@ -145,7 +159,25 @@ public class DoFnTesterTest { } @Test - public void processElementWithTimestamp() throws Exception { + public void processTimestampedElement() throws Exception { + DoFn<Long, TimestampedValue<Long>> reifyTimestamps = new ReifyTimestamps(); + + DoFnTester<Long, TimestampedValue<Long>> tester = DoFnTester.of(reifyTimestamps); + + TimestampedValue<Long> input = TimestampedValue.of(1L, new Instant(100)); + tester.processTimestampedElement(input); + assertThat(tester.takeOutputElements(), contains(input)); + } + + static class ReifyTimestamps extends DoFn<Long, TimestampedValue<Long>> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(TimestampedValue.of(c.element(), c.timestamp())); + } + } + + @Test + public void processElementWithOutputTimestamp() throws Exception { CounterDoFn counterDoFn = new CounterDoFn(); DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java new file mode 100644 index 0000000..84b5b68 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.isOneOf; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Objects; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link Latest.LatestFn}. + * */ +@RunWith(JUnit4.class) +public class LatestFnTests { + private static final Instant INSTANT = new Instant(100); + private static final long VALUE = 100 * INSTANT.getMillis(); + + private static final TimestampedValue<Long> TV = TimestampedValue.of(VALUE, INSTANT); + private static final TimestampedValue<Long> TV_MINUS_TEN = + TimestampedValue.of(VALUE - 10, INSTANT.minus(10)); + private static final TimestampedValue<Long> TV_PLUS_TEN = + TimestampedValue.of(VALUE + 10, INSTANT.plus(10)); + + @Rule + public final ExpectedException thrown = ExpectedException.none(); + + private final Latest.LatestFn<Long> fn = new Latest.LatestFn<>(); + private final Instant baseTimestamp = Instant.now(); + + @Test + public void testDefaultValue() { + assertThat(fn.defaultValue(), nullValue()); + } + + @Test + public void testCreateAccumulator() { + assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.createAccumulator()); + } + + @Test + public void testAddInputInitialAdd() { + TimestampedValue<Long> input = TV; + assertEquals(input, fn.addInput(fn.createAccumulator(), input)); + } + + @Test + public void testAddInputMinTimestamp() { + TimestampedValue<Long> input = TimestampedValue.atMinimumTimestamp(1234L); + assertEquals(input, fn.addInput(fn.createAccumulator(), input)); + } + + @Test + public void testAddInputEarlierValue() { + assertEquals(TV, fn.addInput(TV, TV_MINUS_TEN)); + } + + @Test + public void testAddInputLaterValue() { + assertEquals(TV_PLUS_TEN, fn.addInput(TV, TV_PLUS_TEN)); + } + + @Test + public void testAddInputSameTimestamp() { + TimestampedValue<Long> accum = TimestampedValue.of(100L, INSTANT); + TimestampedValue<Long> input = TimestampedValue.of(200L, INSTANT); + + assertThat("Latest for values with the same timestamp is chosen arbitrarily", + fn.addInput(accum, input), isOneOf(accum, input)); + } + + @Test + public void testAddInputNullAccumulator() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("accumulators"); + fn.addInput(null, TV); + } + + @Test + public void testAddInputNullInput() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("input"); + fn.addInput(TV, null); + } + + @Test + public void testAddInputNullValue() { + TimestampedValue<Long> input = TimestampedValue.of(null, INSTANT.plus(10)); + assertEquals("Null values are allowed", input, fn.addInput(TV, input)); + } + + @Test + public void testMergeAccumulatorsMultipleValues() { + Iterable<TimestampedValue<Long>> accums = Lists.newArrayList( + TV, + TV_PLUS_TEN, + TV_MINUS_TEN + ); + + assertEquals(TV_PLUS_TEN, fn.mergeAccumulators(accums)); + } + + @Test + public void testMergeAccumulatorsSingleValue() { + assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV))); + } + + @Test + public void testMergeAccumulatorsEmptyIterable() { + ArrayList<TimestampedValue<Long>> emptyAccums = Lists.newArrayList(); + assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.mergeAccumulators(emptyAccums)); + } + + @Test + public void testMergeAccumulatorsDefaultAccumulator() { + TimestampedValue<Long> defaultAccum = fn.createAccumulator(); + assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV, defaultAccum))); + } + + @Test + public void testMergeAccumulatorsAllDefaultAccumulators() { + TimestampedValue<Long> defaultAccum = fn.createAccumulator(); + assertEquals(defaultAccum, fn.mergeAccumulators( + Lists.newArrayList(defaultAccum, defaultAccum))); + } + + @Test + public void testMergeAccumulatorsNullIterable() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("accumulators"); + fn.mergeAccumulators(null); + } + + @Test + public void testExtractOutput() { + assertEquals(TV.getValue(), fn.extractOutput(TV)); + } + + @Test + public void testExtractOutputDefaultAggregator() { + TimestampedValue<Long> accum = fn.createAccumulator(); + assertThat(fn.extractOutput(accum), nullValue()); + } + + @Test + public void testExtractOutputNullValue() { + TimestampedValue<Long> accum = TimestampedValue.of(null, baseTimestamp); + assertEquals(null, fn.extractOutput(accum)); + } + + @Test + public void testAggregator() throws Exception { + LatestAggregatorsFn<Long> doFn = new LatestAggregatorsFn<>(TV_MINUS_TEN.getValue()); + DoFnTester<Long, Long> harness = DoFnTester.of(doFn); + for (TimestampedValue<Long> element : Arrays.asList(TV, TV_PLUS_TEN, TV_MINUS_TEN)) { + harness.processTimestampedElement(element); + } + + assertEquals(TV_PLUS_TEN.getValue(), harness.getAggregatorValue(doFn.allValuesAgg)); + assertEquals(TV_MINUS_TEN.getValue(), harness.getAggregatorValue(doFn.specialValueAgg)); + assertThat(harness.getAggregatorValue(doFn.noValuesAgg), nullValue()); + } + + @Test + public void testDefaultCoderHandlesNull() throws CannotProvideCoderException { + Latest.LatestFn<Long> fn = new Latest.LatestFn<>(); + + CoderRegistry registry = new CoderRegistry(); + TimestampedValue.TimestampedValueCoder<Long> inputCoder = + TimestampedValue.TimestampedValueCoder.of(VarLongCoder.of()); + + assertThat("Default output coder should handle null values", + fn.getDefaultOutputCoder(registry, inputCoder), instanceOf(NullableCoder.class)); + assertThat("Default accumulator coder should handle null values", + fn.getAccumulatorCoder(registry, inputCoder), instanceOf(NullableCoder.class)); + } + + static class LatestAggregatorsFn<T> extends DoFn<T, T> { + private final T specialValue; + LatestAggregatorsFn(T specialValue) { + this.specialValue = specialValue; + } + + Aggregator<TimestampedValue<T>, T> allValuesAgg = + createAggregator("allValues", new Latest.LatestFn<T>()); + + Aggregator<TimestampedValue<T>, T> specialValueAgg = + createAggregator("oneValue", new Latest.LatestFn<T>()); + + Aggregator<TimestampedValue<T>, T> noValuesAgg = + createAggregator("noValues", new Latest.LatestFn<T>()); + + @ProcessElement + public void processElement(ProcessContext c) { + TimestampedValue<T> val = TimestampedValue.of(c.element(), c.timestamp()); + allValuesAgg.addValue(val); + if (Objects.equals(c.element(), specialValue)) { + specialValueAgg.addValue(val); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java new file mode 100644 index 0000000..ce9ae37 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertEquals; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; + +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link Latest} {@link PTransform} and {@link Combine.CombineFn}. + */ +@RunWith(JUnit4.class) +public class LatestTest implements Serializable { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + @Test + @Category(NeedsRunner.class) + public void testGloballyEventTimestamp() { + TestPipeline p = TestPipeline.create(); + PCollection<String> output = + p.apply(Create.timestamped( + TimestampedValue.of("foo", new Instant(100)), + TimestampedValue.of("bar", new Instant(300)), + TimestampedValue.of("baz", new Instant(200)) + )) + .apply(Latest.<String>globally()); + + PAssert.that(output).containsInAnyOrder("bar"); + p.run(); + } + + @Test + public void testGloballyOutputCoder() { + TestPipeline p = TestPipeline.create(); + BigEndianLongCoder inputCoder = BigEndianLongCoder.of(); + + PCollection<Long> output = + p.apply(Create.of(1L, 2L).withCoder(inputCoder)) + .apply(Latest.<Long>globally()); + + Coder<Long> outputCoder = output.getCoder(); + assertThat(outputCoder, instanceOf(NullableCoder.class)); + assertEquals(inputCoder, ((NullableCoder<?>) outputCoder).getValueCoder()); + } + + @Test + @Category(NeedsRunner.class) + public void testGloballyEmptyCollection() { + TestPipeline p = TestPipeline.create(); + PCollection<Long> emptyInput = p.apply(Create.<Long>of() + // Explicitly set coder such that then runner enforces encodability. + .withCoder(VarLongCoder.of())); + PCollection<Long> output = emptyInput.apply(Latest.<Long>globally()); + + PAssert.that(output).containsInAnyOrder((Long) null); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testPerKeyEventTimestamp() { + TestPipeline p = TestPipeline.create(); + PCollection<KV<String, String>> output = + p.apply(Create.timestamped( + TimestampedValue.of(KV.of("A", "foo"), new Instant(100)), + TimestampedValue.of(KV.of("B", "bar"), new Instant(300)), + TimestampedValue.of(KV.of("A", "baz"), new Instant(200)) + )) + .apply(Latest.<String, String>perKey()); + + PAssert.that(output).containsInAnyOrder(KV.of("B", "bar"), KV.of("A", "baz")); + p.run(); + } + + @Test + public void testPerKeyOutputCoder() { + TestPipeline p = TestPipeline.create(); + KvCoder<String, Long> inputCoder = KvCoder.of( + AvroCoder.of(String.class), AvroCoder.of(Long.class)); + + PCollection<KV<String, Long>> output = + p.apply(Create.of(KV.of("foo", 1L)).withCoder(inputCoder)) + .apply(Latest.<String, Long>perKey()); + + assertEquals("Should use input coder for outputs", inputCoder, output.getCoder()); + } + + @Test + @Category(NeedsRunner.class) + public void testPerKeyEmptyCollection() { + TestPipeline p = TestPipeline.create(); + PCollection<KV<String, String>> output = + p.apply(Create.<KV<String, String>>of().withCoder(KvCoder.of( + StringUtf8Coder.of(), StringUtf8Coder.of()))) + .apply(Latest.<String, String>perKey()); + + PAssert.that(output).empty(); + p.run(); + } + + /** Helper method to easily create a timestamped value. */ + private static TimestampedValue<Long> timestamped(Instant timestamp) { + return TimestampedValue.of(uniqueLong.incrementAndGet(), timestamp); + } + private static final AtomicLong uniqueLong = new AtomicLong(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java new file mode 100644 index 0000000..a982f31 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.values; + +import static org.junit.Assert.assertEquals; + +import com.google.common.testing.EqualsTester; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; + +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link TimestampedValue}. + */ +@RunWith(JUnit4.class) +public class TimestampedValueTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testValues() { + Instant now = Instant.now(); + TimestampedValue<String> tsv = TimestampedValue.of("foobar", now); + + assertEquals(now, tsv.getTimestamp()); + assertEquals("foobar", tsv.getValue()); + } + + @Test + public void testAtMinimumTimestamp() { + TimestampedValue<String> tsv = TimestampedValue.atMinimumTimestamp("foobar"); + assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, tsv.getTimestamp()); + } + + @Test + public void testNullTimestamp() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("timestamp"); + TimestampedValue.of("foobar", null); + } + + @Test + public void testNullValue() { + TimestampedValue<String> tsv = TimestampedValue.atMinimumTimestamp(null); + assertEquals(null, tsv.getValue()); + } + + @Test + public void testEquality() { + new EqualsTester() + .addEqualityGroup( + TimestampedValue.of("foo", new Instant(1000)), + TimestampedValue.of("foo", new Instant(1000))) + .addEqualityGroup(TimestampedValue.of("foo", new Instant(2000))) + .addEqualityGroup(TimestampedValue.of("bar", new Instant(1000))) + .addEqualityGroup( + TimestampedValue.of("foo", BoundedWindow.TIMESTAMP_MIN_VALUE), + TimestampedValue.atMinimumTimestamp("foo")) + .testEquals(); + } +}