Repository: beam Updated Branches: refs/heads/master df36bd9d7 -> aadbe36ff
[BEAM-2734] Unbreaks some Dataflow ValidatesRunner tests Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97b12d53 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97b12d53 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97b12d53 Branch: refs/heads/master Commit: 97b12d53f956dfffd7594ae5b9433ac9df9c793a Parents: df36bd9 Author: Eugene Kirpichov <[email protected]> Authored: Fri Aug 4 16:46:42 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Aug 4 17:49:41 2017 -0700 ---------------------------------------------------------------------- .../construction/UnboundedReadFromBoundedSource.java | 11 ++++++++--- .../src/main/java/org/apache/beam/sdk/io/Source.java | 14 +++++++++++++- 2 files changed, 21 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/97b12d53/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java index 55f9519..24eb384 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java @@ -91,6 +91,11 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle } @Override + protected Coder<T> getDefaultOutputCoder() { + return source.getDefaultOutputCoder(); + } + + @Override public String getKindString() { return String.format("Read(%s)", NameUtils.approximateSimpleName(source)); } @@ -161,14 +166,14 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle } @Override - public Coder<T> getOutputCoder() { - return boundedSource.getOutputCoder(); + public Coder<T> getDefaultOutputCoder() { + return boundedSource.getDefaultOutputCoder(); } @SuppressWarnings({"rawtypes", "unchecked"}) @Override public Coder<Checkpoint<T>> getCheckpointMarkCoder() { - return new CheckpointCoder<>(boundedSource.getOutputCoder()); + return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder()); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/beam/blob/97b12d53/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java index 32a7270..872c135 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java @@ -64,11 +64,23 @@ public abstract class Source<T> implements Serializable, HasDisplayData { /** @deprecated Override {@link #getOutputCoder()} instead. */ @Deprecated public Coder<T> getDefaultOutputCoder() { - throw new UnsupportedOperationException("Source needs to override getOutputCoder()"); + // If the subclass doesn't override getDefaultOutputCoder(), hopefully it overrides the proper + // version - getOutputCoder(). Check that it does, before calling the method (if subclass + // doesn't override it, we'll call the default implementation and get infinite recursion). + try { + if (getClass().getMethod("getOutputCoder").getDeclaringClass().equals(Source.class)) { + throw new UnsupportedOperationException( + getClass() + " needs to override getOutputCoder()."); + } + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + return getOutputCoder(); } /** Returns the {@code Coder} to use for the data read from this source. */ public Coder<T> getOutputCoder() { + // Call the old method for compatibility. return getDefaultOutputCoder(); }
