Repository: beam Updated Branches: refs/heads/master 25f6358d6 -> 6d9e91b29
[BEAM-2260] Improve construction-time errors for Text and AvroIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/711f79b3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/711f79b3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/711f79b3 Branch: refs/heads/master Commit: 711f79b338b260185475df6d54af898f7dc51b9d Parents: 25f6358 Author: Dan Halperin <[email protected]> Authored: Thu May 11 10:45:20 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Thu May 11 12:37:07 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/beam/sdk/io/AvroIO.java | 2 ++ .../main/java/org/apache/beam/sdk/io/TextIO.java | 2 ++ .../java/org/apache/beam/sdk/io/AvroIOTest.java | 16 ++++++++++++++++ .../java/org/apache/beam/sdk/io/TextIOTest.java | 12 +++++++++++- 4 files changed, 31 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/711f79b3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index c7e7233..d13c6ff 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -441,6 +441,8 @@ public class AvroIO { "Cannot set a filename policy and also a filename template or suffix."); checkState(getSchema() != null, "Need to set the schema of an AvroIO.Write transform."); + checkState(!getWindowedWrites() || (getFilenamePolicy() != null), + "When using windowed writes, a filename policy must be set via withFilenamePolicy()."); FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); if (usedFilenamePolicy == null) { http://git-wip-us.apache.org/repos/asf/beam/blob/711f79b3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index dbfaeee..af6a069 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -430,6 +430,8 @@ public class TextIO { (getFilenamePolicy() == null) || (getShardTemplate() == null && getFilenameSuffix() == null), "Cannot set a filename policy and also a filename template or suffix."); + checkState(!getWindowedWrites() || (getFilenamePolicy() != null), + "When using windowed writes, a filename policy must be set via withFilenamePolicy()."); FilenamePolicy usedFilenamePolicy = getFilenamePolicy(); if (usedFilenamePolicy == null) { http://git-wip-us.apache.org/repos/asf/beam/blob/711f79b3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 47b847f..d71f2f7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -54,6 +54,7 @@ import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; @@ -77,6 +78,7 @@ import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -93,6 +95,9 @@ public class AvroIOTest { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Test public void testAvroIOGetName() { assertEquals("AvroIO.Read", AvroIO.read(String.class).from("/tmp/foo*/baz").getName()); @@ -560,4 +565,15 @@ public class AvroIOTest { assertThat(displayData, hasDisplayItem("numShards", 100)); assertThat(displayData, hasDisplayItem("codec", CodecFactory.snappyCodec().toString())); } + + @Test + public void testWindowedWriteRequiresFilenamePolicy() { + PCollection<String> emptyInput = p.apply(Create.empty(StringUtf8Coder.of())); + AvroIO.Write write = AvroIO.write(String.class).to("/tmp/some/file").withWindowedWrites(); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage( + "When using windowed writes, a filename policy must be set via withFilenamePolicy()"); + emptyInput.apply(write); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/711f79b3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 685da82..0d8fbbd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -107,7 +107,6 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) @SuppressWarnings("unchecked") public class TextIOTest { - private static final String MY_HEADER = "myHeader"; private static final String MY_FOOTER = "myFooter"; private static final String[] EMPTY = new String[] {}; @@ -1103,5 +1102,16 @@ public class TextIOTest { assertThat(splits, hasSize(equalTo(1))); SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); } + + @Test + public void testWindowedWriteRequiresFilenamePolicy() { + PCollection<String> emptyInput = p.apply(Create.empty(StringUtf8Coder.of())); + TextIO.Write write = TextIO.write().to("/tmp/some/file").withWindowedWrites(); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage( + "When using windowed writes, a filename policy must be set via withFilenamePolicy()"); + emptyInput.apply(write); + } }
