[BEAM-2114] Tests for KafkaIO: use ExpectedException rule
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/34e00465 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/34e00465 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/34e00465 Branch: refs/heads/master Commit: 34e0046512282872f205166162b16b616f834e93 Parents: 10fc5f8 Author: peay <[email protected]> Authored: Sun Apr 30 18:22:38 2017 +0200 Committer: Eugene Kirpichov <[email protected]> Committed: Sun Apr 30 09:45:30 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 33 +++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/34e00465/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 2f895fe..591c099 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -102,7 +102,6 @@ import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -240,8 +239,8 @@ public class KafkaIOTest { } /** - * Creates a consumer with two topics, with 5 partitions each. - * numElements are (round-robin) assigned all the 10 partitions. + * Creates a consumer with two topics, with 10 partitions each. + * numElements are (round-robin) assigned all the 20 partitions. */ private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform( int numElements, @@ -266,8 +265,9 @@ public class KafkaIOTest { } /** - * Creates a consumer with two topics, with 5 partitions each. - * numElements are (round-robin) assigned all the 10 partitions. + * Creates a consumer with two topics, with 10 partitions each. + * numElements are (round-robin) assigned all the 20 partitions. + * Coders are specified explicitly. */ private static KafkaIO.Read<Integer, Long> mkKafkaReadTransformWithCoders( int numElements, @@ -918,17 +918,22 @@ public class KafkaIOTest { } } + // class for which a coder cannot be infered + private static class NonInferableObject { + + } + // class for testing coder inference - private static class ObjectDeserializer - implements Deserializer<Object> { + private static class NonInferableObjectDeserializer + implements Deserializer<NonInferableObject> { @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override - public Object deserialize(String topic, byte[] bytes) { - return new Object(); + public NonInferableObject deserialize(String topic, byte[] bytes) { + return new NonInferableObject(); } @Override @@ -953,10 +958,14 @@ public class KafkaIOTest { instanceof VarLongCoder); } - @Test(expected = RuntimeException.class) - public void testInferKeyCoderFailure() { + @Rule public ExpectedException cannotInferException = ExpectedException.none(); + + @Test + public void testInferKeyCoderFailure() throws Exception { + cannotInferException.expect(RuntimeException.class); + CoderRegistry registry = CoderRegistry.createDefault(); - KafkaIO.inferCoder(registry, ObjectDeserializer.class); + KafkaIO.inferCoder(registry, NonInferableObjectDeserializer.class); } @Test
