Repository: beam Updated Branches: refs/heads/master 95beda69b -> 6413299a2
[BEAM-708] use AutoValue to reduce boilerplate in BoundedReadFromUnboundedSource Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eeec9f12 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eeec9f12 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eeec9f12 Branch: refs/heads/master Commit: eeec9f128f5d5fd4db6de4fd90d4967ff87587e4 Parents: c53249d Author: Kai Jiang <jiang...@gmail.com> Authored: Wed Jan 18 07:00:44 2017 -0800 Committer: Kai Jiang <jiang...@gmail.com> Committed: Wed Jan 18 23:37:14 2017 -0800 ---------------------------------------------------------------------- .../sdk/io/BoundedReadFromUnboundedSource.java | 71 +++++++++++++------- 1 file changed, 46 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/eeec9f12/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java index 8b63bfd..7e25a01 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java @@ -18,12 +18,14 @@ package org.apache.beam.sdk.io; import com.google.api.client.util.BackOff; +import com.google.auto.value.AutoValue; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; @@ -82,7 +84,12 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle this.source = source; this.maxNumRecords = maxNumRecords; this.maxReadTime = maxReadTime; - this.adaptedSource = new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime); + this.adaptedSource = + new AutoValue_BoundedReadFromUnboundedSource_UnboundedToBoundedSourceAdapter + .Builder() + .setSource(source) + .setMaxNumRecords(maxNumRecords) + .setMaxReadTime(maxReadTime).build(); } /** @@ -133,17 +140,27 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle .include("source", source); } - private static class UnboundedToBoundedSourceAdapter<T> + /** + * An Adapter wraps the underlying {@link UnboundedSource} with the specified bounds on + * number of records and read time into {@link BoundedSource}. + */ + @AutoValue + public abstract static class UnboundedToBoundedSourceAdapter<T> extends BoundedSource<ValueWithRecordId<T>> { - private final UnboundedSource<T, ?> source; - private final long maxNumRecords; - private final Duration maxReadTime; - - private UnboundedToBoundedSourceAdapter( - UnboundedSource<T, ?> source, long maxNumRecords, Duration maxReadTime) { - this.source = source; - this.maxNumRecords = maxNumRecords; - this.maxReadTime = maxReadTime; + @Nullable abstract UnboundedSource<T, ?> getSource(); + @Nullable abstract long getMaxNumRecords(); + @Nullable abstract Duration getMaxReadTime(); + + public abstract String toString(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setSource(UnboundedSource<T, ?> source); + abstract Builder<T> setMaxNumRecords(long maxNumRecords); + abstract Builder<T> setMaxReadTime(Duration maxReadTime); + abstract UnboundedToBoundedSourceAdapter<T> build(); } /** @@ -174,14 +191,17 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle public List<? extends BoundedSource<ValueWithRecordId<T>>> splitIntoBundles( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { List<UnboundedToBoundedSourceAdapter<T>> result = new ArrayList<>(); - int numInitialSplits = numInitialSplits(maxNumRecords); + int numInitialSplits = numInitialSplits(getMaxNumRecords()); List<? extends UnboundedSource<T, ?>> splits = - source.generateInitialSplits(numInitialSplits, options); + getSource().generateInitialSplits(numInitialSplits, options); int numSplits = splits.size(); - long[] numRecords = splitNumRecords(maxNumRecords, numSplits); + long[] numRecords = splitNumRecords(getMaxNumRecords(), numSplits); for (int i = 0; i < numSplits; i++) { - result.add( - new UnboundedToBoundedSourceAdapter<T>(splits.get(i), numRecords[i], maxReadTime)); + result.add(toBuilder() + .setSource(splits.get(i)) + .setMaxNumRecords(numRecords[i]) + .setMaxReadTime(getMaxReadTime()) + .build()); } return result; } @@ -194,34 +214,34 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle @Override public Coder<ValueWithRecordId<T>> getDefaultOutputCoder() { - return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getDefaultOutputCoder()); + return ValueWithRecordId.ValueWithRecordIdCoder.of(getSource().getDefaultOutputCoder()); } @Override public void validate() { - source.validate(); + getSource().validate(); } @Override public BoundedReader<ValueWithRecordId<T>> createReader(PipelineOptions options) throws IOException { - return new Reader(source.createReader(options, null)); + return new Reader(getSource().createReader(options, null)); } @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(source); + builder.delegate(getSource()); } private class Reader extends BoundedReader<ValueWithRecordId<T>> { private long recordsRead = 0L; - private Instant endTime = Instant.now().plus(maxReadTime); + private Instant endTime = Instant.now().plus(getMaxReadTime()); private UnboundedSource.UnboundedReader<T> reader; private Reader(UnboundedSource.UnboundedReader<T> reader) { this.recordsRead = 0L; - if (maxReadTime != null) { - this.endTime = Instant.now().plus(maxReadTime); + if (getMaxReadTime() != null) { + this.endTime = Instant.now().plus(getMaxReadTime()); } else { this.endTime = null; } @@ -230,7 +250,8 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle @Override public boolean start() throws IOException { - if (maxNumRecords <= 0 || (maxReadTime != null && maxReadTime.getMillis() == 0)) { + if (getMaxNumRecords() <= 0 || (getMaxReadTime() != null + && getMaxReadTime().getMillis() == 0)) { return false; } @@ -244,7 +265,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle @Override public boolean advance() throws IOException { - if (recordsRead >= maxNumRecords) { + if (recordsRead >= getMaxNumRecords()) { finalizeCheckpoint(); return false; }