Gets rid of raw type in TextIO.Read.watchForNewFiles
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/184f7a9b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/184f7a9b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/184f7a9b Branch: refs/heads/master Commit: 184f7a9b31641641cdb4bc7ddcf3556c0514f71b Parents: d64f2cc Author: Eugene Kirpichov <ekirpic...@gmail.com> Authored: Wed Aug 16 14:25:33 2017 -0700 Committer: Eugene Kirpichov <ekirpic...@gmail.com> Committed: Wed Aug 30 11:55:18 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/TextIO.java | 15 ++++--- .../org/apache/beam/sdk/transforms/Watch.java | 42 ++++++++++++++++++++ 2 files changed, 51 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/184f7a9b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index cbc17ff..835008f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.sdk.transforms.Watch.Growth.ignoreInput; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; @@ -250,7 +251,7 @@ public class TextIO { abstract Duration getWatchForNewFilesInterval(); @Nullable - abstract TerminationCondition getWatchForNewFilesTerminationCondition(); + abstract TerminationCondition<?, ?> getWatchForNewFilesTerminationCondition(); abstract boolean getHintMatchesManyFiles(); abstract EmptyMatchTreatment getEmptyMatchTreatment(); @@ -262,7 +263,8 @@ public class TextIO { abstract Builder setFilepattern(ValueProvider<String> filepattern); abstract Builder setCompressionType(CompressionType compressionType); abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval); - abstract Builder setWatchForNewFilesTerminationCondition(TerminationCondition condition); + abstract Builder setWatchForNewFilesTerminationCondition( + TerminationCondition<?, ?> condition); abstract Builder setHintMatchesManyFiles(boolean hintManyFiles); abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment); @@ -312,7 +314,8 @@ public class TextIO { * @see TerminationCondition */ @Experimental(Kind.SPLITTABLE_DO_FN) - public Read watchForNewFiles(Duration pollInterval, TerminationCondition terminationCondition) { + public Read watchForNewFiles( + Duration pollInterval, TerminationCondition<?, ?> terminationCondition) { return toBuilder() .setWatchForNewFilesInterval(pollInterval) .setWatchForNewFilesTerminationCondition(terminationCondition) @@ -352,9 +355,9 @@ public class TextIO { .withCompressionType(getCompressionType()) .withEmptyMatchTreatment(getEmptyMatchTreatment()); if (getWatchForNewFilesInterval() != null) { - readAll = - readAll.watchForNewFiles( - getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition()); + TerminationCondition<String, ?> readAllCondition = + ignoreInput(getWatchForNewFilesTerminationCondition()); + readAll = readAll.watchForNewFiles(getWatchForNewFilesInterval(), readAllCondition); } return input .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) http://git-wip-us.apache.org/repos/asf/beam/blob/184f7a9b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java index 9da2408..21f0641 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java @@ -264,6 +264,15 @@ public class Watch { } /** + * Wraps a given input-independent {@link TerminationCondition} as an equivalent condition + * with a given input type, passing {@code null} to the original condition as input. + */ + public static <InputT, StateT> TerminationCondition<InputT, StateT> ignoreInput( + TerminationCondition<?, StateT> condition) { + return new IgnoreInput<>(condition); + } + + /** * Returns a {@link TerminationCondition} that holds after the given time has elapsed after the * current input was seen. */ @@ -344,6 +353,39 @@ public class Watch { } } + static class IgnoreInput<InputT, StateT> implements TerminationCondition<InputT, StateT> { + private final TerminationCondition<?, StateT> wrapped; + + IgnoreInput(TerminationCondition<?, StateT> wrapped) { + this.wrapped = wrapped; + } + + @Override + public Coder<StateT> getStateCoder() { + return wrapped.getStateCoder(); + } + + @Override + public StateT forNewInput(Instant now, InputT input) { + return wrapped.forNewInput(now, null); + } + + @Override + public StateT onSeenNewOutput(Instant now, StateT state) { + return wrapped.onSeenNewOutput(now, state); + } + + @Override + public boolean canStopPolling(Instant now, StateT state) { + return wrapped.canStopPolling(now, state); + } + + @Override + public String toString(StateT state) { + return wrapped.toString(state); + } + } + static class AfterTotalOf<InputT> implements TerminationCondition< InputT, KV<Instant /* timeStarted */, ReadableDuration /* maxTimeSinceInput */>> {