Repository: beam Updated Branches: refs/heads/master 62f041e56 -> 781c15522
Introduces GenerateSequence transform It is a replacement for CountingInput, which will be deprecated. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/88c66129 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/88c66129 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/88c66129 Branch: refs/heads/master Commit: 88c66129ba0cff9c8319f21ad317597d9bd8b5cd Parents: 62f041e Author: Eugene Kirpichov <[email protected]> Authored: Tue Apr 18 16:48:38 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Apr 21 16:53:49 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/CountingInput.java | 2 +- .../org/apache/beam/sdk/io/CountingSource.java | 4 +- .../apache/beam/sdk/io/GenerateSequence.java | 194 +++++++++++++++++++ .../apache/beam/sdk/io/CountingSourceTest.java | 4 +- .../beam/sdk/io/GenerateSequenceTest.java | 194 +++++++++++++++++++ 5 files changed, 393 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/88c66129/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java index 72ebd97..ab006d4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java @@ -247,7 +247,7 @@ public class CountingInput { public PCollection<Long> expand(PBegin begin) { Unbounded<Long> read = Read.from( - CountingSource.createUnbounded() + CountingSource.createUnboundedFrom(0) .withTimestampFn(timestampFn) .withRate(elementsPerPeriod, period)); if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) { http://git-wip-us.apache.org/repos/asf/beam/blob/88c66129/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java index 73b663d..dd018f4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java @@ -103,8 +103,8 @@ public class CountingSource { * Create a new {@link UnboundedCountingSource}. */ // package-private to return a typed UnboundedCountingSource rather than the UnboundedSource type. - static UnboundedCountingSource createUnbounded() { - return new UnboundedCountingSource(0, 1, 1L, Duration.ZERO, new NowTimestampFn()); + static UnboundedCountingSource createUnboundedFrom(long start) { + return new UnboundedCountingSource(start, 1, 1L, Duration.ZERO, new NowTimestampFn()); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/88c66129/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java new file mode 100644 index 0000000..189539f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * A {@link PTransform} that produces longs starting from the given value, and either up to the + * given limit or until {@link Long#MAX_VALUE} / until the given time elapses. + * + * <p>The bounded {@link GenerateSequence} is implemented based on {@link OffsetBasedSource} and + * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it + * supports dynamic work rebalancing. + * + * <p>To produce a bounded {@code PCollection<Long>}: + * + * <pre>{@code + * Pipeline p = ... + * PCollection<Long> bounded = p.apply(GenerateSequence.from(0).to(1000)); + * }</pre> + * + * <p>To produce an unbounded {@code PCollection<Long>}, simply do not specify {@link #to(long)}, + * calling {@link #withTimestampFn(SerializableFunction)} to provide values with timestamps other + * than {@link Instant#now}. + * + * <pre>{@code + * Pipeline p = ... + * + * // To use processing time as the element timestamp. + * PCollection<Long> unbounded = p.apply(GenerateSequence.from(0)); + * // Or, to use a provided function to set the element timestamp. + * PCollection<Long> unboundedWithTimestamps = + * p.apply(GenerateSequence.from(0).withTimestampFn(someFn)); + * }</pre> + * + * <p>In all cases, the sequence of numbers is generated in parallel, so there is no inherent + * ordering between the generated values - it is only guaranteed that all values in the given range + * will be present in the resulting {@link PCollection}. + */ +@AutoValue +public abstract class GenerateSequence extends PTransform<PBegin, PCollection<Long>> { + abstract long getFrom(); + + abstract long getTo(); + + @Nullable + abstract SerializableFunction<Long, Instant> getTimestampFn(); + + abstract long getElementsPerPeriod(); + + @Nullable + abstract Duration getPeriod(); + + @Nullable + abstract Duration getMaxReadTime(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setFrom(long from); + + abstract Builder setTo(long to); + + abstract Builder setTimestampFn(SerializableFunction<Long, Instant> timestampFn); + + abstract Builder setElementsPerPeriod(long elementsPerPeriod); + + abstract Builder setPeriod(Duration period); + + abstract Builder setMaxReadTime(Duration maxReadTime); + + abstract GenerateSequence build(); + } + + /** Specifies the minimum number to generate (inclusive). */ + public static GenerateSequence from(long from) { + checkArgument(from >= 0, "Value of from must be non-negative, but was: %s", from); + return new AutoValue_GenerateSequence.Builder() + .setFrom(from) + .setTo(-1) + .setElementsPerPeriod(0) + .build(); + } + + /** Specifies the maximum number to generate (exclusive). */ + public GenerateSequence to(long to) { + checkArgument( + getTo() == -1 || getTo() >= getFrom(), "Degenerate range [%s, %s)", getFrom(), getTo()); + return toBuilder().setTo(to).build(); + } + + /** Specifies the function to use to assign timestamps to the elements. */ + public GenerateSequence withTimestampFn(SerializableFunction<Long, Instant> timestampFn) { + return toBuilder().setTimestampFn(timestampFn).build(); + } + + /** Specifies to generate at most a given number of elements per a given period. */ + public GenerateSequence withRate(long numElements, Duration periodLength) { + checkArgument( + numElements > 0, + "Number of elements in withRate must be positive, but was: %s", + numElements); + checkNotNull(periodLength, "periodLength in withRate must be non-null"); + return toBuilder().setElementsPerPeriod(numElements).setPeriod(periodLength).build(); + } + + /** Specifies to stop generating elements after the given time. */ + public GenerateSequence withMaxReadTime(Duration maxReadTime) { + return toBuilder().setMaxReadTime(maxReadTime).build(); + } + + @Override + public PCollection<Long> expand(PBegin input) { + boolean isRangeUnbounded = getTo() < 0; + boolean usesUnboundedFeatures = + getTimestampFn() != null || getElementsPerPeriod() > 0 || getMaxReadTime() != null; + if (!isRangeUnbounded && !usesUnboundedFeatures) { + // This is the only case when we can use the bounded CountingSource. + return input.apply(Read.from(CountingSource.createSourceForSubrange(getFrom(), getTo()))); + } + + CountingSource.UnboundedCountingSource source = CountingSource.createUnboundedFrom(getFrom()); + if (getTimestampFn() != null) { + source = source.withTimestampFn(getTimestampFn()); + } + if (getElementsPerPeriod() > 0) { + source = source.withRate(getElementsPerPeriod(), getPeriod()); + } + + Read.Unbounded<Long> readUnbounded = Read.from(source); + + if (getMaxReadTime() == null) { + if (isRangeUnbounded) { + return input.apply(readUnbounded); + } else { + return input.apply(readUnbounded.withMaxNumRecords(getTo() - getFrom())); + } + } else { + BoundedReadFromUnboundedSource<Long> withMaxReadTime = + readUnbounded.withMaxReadTime(getMaxReadTime()); + if (isRangeUnbounded) { + return input.apply(withMaxReadTime); + } else { + return input.apply(withMaxReadTime.withMaxNumRecords(getTo() - getFrom())); + } + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("from", getFrom()).withLabel("Generate sequence from")); + builder.addIfNotDefault( + DisplayData.item("to", getTo()).withLabel("Generate sequence to (exclusive)"), -1L); + builder.addIfNotNull( + DisplayData.item( + "timestampFn", getTimestampFn() == null ? null : getTimestampFn().getClass()) + .withLabel("Timestamp Function")); + builder.addIfNotNull( + DisplayData.item("maxReadTime", getMaxReadTime()).withLabel("Maximum Read Time")); + if (getElementsPerPeriod() > 0) { + builder.add( + DisplayData.item("elementsPerPeriod", getElementsPerPeriod()) + .withLabel("Elements per period")); + builder.add(DisplayData.item("period", getPeriod()).withLabel("Period")); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/88c66129/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java index 8807164..d56160a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java @@ -202,7 +202,7 @@ public class CountingSourceTest { PCollection<Long> input = p.apply( Read.from( - CountingSource.createUnbounded() + CountingSource.createUnboundedFrom(0) .withTimestampFn(new ValueAsTimestampFn()) .withRate(1, period)) .withMaxNumRecords(numElements)); @@ -260,7 +260,7 @@ public class CountingSourceTest { int numSplits = 10; UnboundedCountingSource initial = - CountingSource.createUnbounded().withRate(elementsPerPeriod, period); + CountingSource.createUnboundedFrom(0).withRate(elementsPerPeriod, period); List<? extends UnboundedSource<Long, ?>> splits = initial.split(numSplits, p.getOptions()); assertEquals("Expected exact splitting", numSplits, splits.size()); http://git-wip-us.apache.org/repos/asf/beam/blob/88c66129/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java new file mode 100644 index 0000000..49af479 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.ValidatesRunner; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Distinct; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link GenerateSequence}. */ +@RunWith(JUnit4.class) +public class GenerateSequenceTest { + public static void addCountingAsserts(PCollection<Long> input, long start, long end) { + // Count == numElements + PAssert.thatSingleton(input.apply("Count", Count.<Long>globally())).isEqualTo(end - start); + // Unique count == numElements + PAssert.thatSingleton( + input.apply(Distinct.<Long>create()).apply("UniqueCount", Count.<Long>globally())) + .isEqualTo(end - start); + // Min == start + PAssert.thatSingleton(input.apply("Min", Min.<Long>globally())).isEqualTo(start); + // Max == end-1 + PAssert.thatSingleton(input.apply("Max", Max.<Long>globally())).isEqualTo(end - 1); + } + + @Rule public TestPipeline p = TestPipeline.create(); + + @Test + @Category(ValidatesRunner.class) + public void testBoundedInput() { + long numElements = 1000; + PCollection<Long> input = p.apply(GenerateSequence.from(0).to(numElements)); + + addCountingAsserts(input, 0, numElements); + p.run(); + } + + @Test + @Category(ValidatesRunner.class) + public void testEmptyBoundedInput() { + PCollection<Long> input = p.apply(GenerateSequence.from(0).to(0)); + + PAssert.that(input).empty(); + p.run(); + } + + @Test + @Category(ValidatesRunner.class) + public void testEmptyBoundedInputSubrange() { + PCollection<Long> input = p.apply(GenerateSequence.from(42).to(42)); + + PAssert.that(input).empty(); + p.run(); + } + + @Test + @Category(ValidatesRunner.class) + public void testBoundedInputSubrange() { + long start = 10; + long end = 1000; + PCollection<Long> input = p.apply(GenerateSequence.from(start).to(end)); + + addCountingAsserts(input, start, end); + p.run(); + } + + @Test + public void testBoundedDisplayData() { + PTransform<?, ?> input = GenerateSequence.from(0).to(1234); + DisplayData displayData = DisplayData.from(input); + assertThat(displayData, hasDisplayItem("from", 0)); + assertThat(displayData, hasDisplayItem("to", 1234)); + } + + @Test + public void testBoundedDisplayDataSubrange() { + PTransform<?, ?> input = GenerateSequence.from(12).to(1234); + DisplayData displayData = DisplayData.from(input); + assertThat(displayData, hasDisplayItem("from", 12)); + assertThat(displayData, hasDisplayItem("to", 1234)); + } + + @Test + @Category(NeedsRunner.class) + public void testUnboundedInputRate() { + long numElements = 5000; + + long elemsPerPeriod = 10L; + Duration periodLength = Duration.millis(8); + PCollection<Long> input = + p.apply(GenerateSequence.from(0).to(numElements).withRate(elemsPerPeriod, periodLength)); + + addCountingAsserts(input, 0, numElements); + long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod; + Instant startTime = Instant.now(); + p.run(); + Instant endTime = Instant.now(); + assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true)); + } + + private static class ElementValueDiff extends DoFn<Long, Long> { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + c.output(c.element() - c.timestamp().getMillis()); + } + } + + @Test + @Category(ValidatesRunner.class) + public void testUnboundedInputTimestamps() { + long numElements = 1000; + + PCollection<Long> input = + p.apply(GenerateSequence.from(0).to(numElements).withTimestampFn(new ValueAsTimestampFn())); + addCountingAsserts(input, 0, numElements); + + PCollection<Long> diffs = + input + .apply("TimestampDiff", ParDo.of(new ElementValueDiff())) + .apply("DistinctTimestamps", Distinct.<Long>create()); + // This assert also confirms that diffs only has one unique value. + PAssert.thatSingleton(diffs).isEqualTo(0L); + + p.run(); + } + + @Test + public void testUnboundedDisplayData() { + Duration maxReadTime = Duration.standardHours(5); + SerializableFunction<Long, Instant> timestampFn = + new SerializableFunction<Long, Instant>() { + @Override + public Instant apply(Long input) { + return Instant.now(); + } + }; + + PTransform<?, ?> input = + GenerateSequence.from(0).to(1234).withMaxReadTime(maxReadTime).withTimestampFn(timestampFn); + + DisplayData displayData = DisplayData.from(input); + + assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime)); + assertThat(displayData, hasDisplayItem("timestampFn", timestampFn.getClass())); + } + + /** + * A timestamp function that uses the given value as the timestamp. Because the input values will + * not wrap, this function is non-decreasing and meets the timestamp function criteria laid out in + * {@link GenerateSequence#withTimestampFn(SerializableFunction)}. + */ + private static class ValueAsTimestampFn implements SerializableFunction<Long, Instant> { + @Override + public Instant apply(Long input) { + return new Instant(input); + } + } +}
