Ensure a TypedPValue has a Coder on finishSpecifying Coders cannot be set on a PValue that is is marked as finished specifying, and a coder is required for every TypedPValue in a pipeline.
Ensure that a coder is always available when a TypedPValue has been finished by invoking getCoder() (which will throw an exception if no coder is available) ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115601351 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fba91473 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fba91473 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fba91473 Branch: refs/heads/master Commit: fba914736cc3f3401aa96c252a1336e9e5865b1e Parents: 06c8911 Author: tgroh <[email protected]> Authored: Thu Feb 25 13:59:52 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:28 2016 -0800 ---------------------------------------------------------------------- .../cloud/dataflow/sdk/values/TypedPValue.java | 3 ++ .../dataflow/sdk/values/TypedPValueTest.java | 51 +++++++++++--------- 2 files changed, 31 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fba91473/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TypedPValue.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TypedPValue.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TypedPValue.java index 9b210b2..29fd639 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TypedPValue.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TypedPValue.java @@ -83,6 +83,9 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue { return; } super.finishSpecifying(); + // Ensure that this TypedPValue has a coder by inferring the coder if none exists; If not, + // this will throw an exception. + getCoder(); } ///////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fba91473/sdk/src/test/java/com/google/cloud/dataflow/sdk/values/TypedPValueTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/values/TypedPValueTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/values/TypedPValueTest.java index 4c62111..b0a13ec 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/values/TypedPValueTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/values/TypedPValueTest.java @@ -20,7 +20,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.VarIntCoder; @@ -132,28 +131,34 @@ public class TypedPValueTest { @Test public void testParDoWithNoSideOutputsErrorDoesNotMentionTupleTag() { Pipeline p = TestPipeline.create(); - PCollection<EmptyClass> input = p - .apply(Create.of(1, 2, 3)) - .apply(ParDo.of(new EmptyClassDoFn())); - - try { - input.getCoder(); - } catch (IllegalStateException exc) { - String message = exc.getMessage(); - - // Output specific to ParDo TupleTag side outputs should not be present. - assertThat(message, not(containsString("erasure"))); - assertThat(message, not(containsString("see TupleTag Javadoc"))); - // Instead, expect output suggesting other possible fixes. - assertThat(message, - containsString("Building a Coder using a registered CoderFactory failed")); - assertThat(message, - containsString("Building a Coder from the @DefaultCoder annotation failed")); - assertThat(message, - containsString("Building a Coder from the fallback CoderProvider failed")); - return; - } - fail("Should have thrown IllegalStateException due to failure to infer a coder."); + PCollection<EmptyClass> input = + p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new EmptyClassDoFn())); + + thrown.expect(IllegalStateException.class); + + // Output specific to ParDo TupleTag side outputs should not be present. + thrown.expectMessage(not(containsString("erasure"))); + thrown.expectMessage(not(containsString("see TupleTag Javadoc"))); + // Instead, expect output suggesting other possible fixes. + thrown.expectMessage(containsString("Building a Coder using a registered CoderFactory failed")); + thrown.expectMessage( + containsString("Building a Coder from the @DefaultCoder annotation failed")); + thrown.expectMessage(containsString("Building a Coder from the fallback CoderProvider failed")); + + input.getCoder(); + } + + @Test + public void testFinishSpecifyingShouldFailIfNoCoderInferrable() { + Pipeline p = TestPipeline.create(); + PCollection<EmptyClass> unencodable = + p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new EmptyClassDoFn())); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Unable to return a default Coder"); + thrown.expectMessage("Inferring a Coder from the CoderRegistry failed"); + + unencodable.finishSpecifying(); } }
