Repository: samza Updated Branches: refs/heads/master c5557140d -> a6701f64a
SAMZA-1758: Configuring a timeout for TestRunner to execute the SamzaJob Author: sanil15 <[email protected]> Author: Sanil Jain <[email protected]> Reviewers: Bharath Kumarasubramanian <[email protected]> Closes #563 from Sanil15/SAMZA-1758 and squashes the following commits: 5b198a0f [Sanil Jain] Fixing a bug for preconditions 6d2fd334 [Sanil Jain] Adressing Review 1eb3847c [Sanil Jain] Removing explicit handling of TimeoutException and adding more docs 0a9689c9 [sanil15] Addressing Review, moving tests from SamzaFailureTests, improving doc, adding validation b79b5628 [sanil15] Using ExceptionUtils to get full stack trace, adding more docs dd816ff8 [sanil15] Addressing review, using waitForFinish(timeout) to configure a timeout for TestRunner, adding some Failure tests 903c1162 [sanil15] Configuring a timeout for TestRunner to execute the SamzaJob Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a6701f64 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a6701f64 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a6701f64 Branch: refs/heads/master Commit: a6701f64a692b1357138886f652f8a8699ae1534 Parents: c555714 Author: sanil15 <[email protected]> Authored: Thu Jun 28 15:48:28 2018 -0700 Committer: Boris S <[email protected]> Committed: Thu Jun 28 15:48:28 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/test/framework/TestRunner.java | 24 +++++++-- .../AsyncStreamTaskIntegrationTest.java | 20 +++++++- .../StreamApplicationIntegrationTest.java | 54 ++++++++++++++++++-- .../framework/StreamTaskIntegrationTest.java | 21 +++++++- 4 files changed, 110 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a6701f64/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index dee10c6..6e647d9 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -20,6 +20,7 @@ package org.apache.samza.test.framework; import com.google.common.base.Preconditions; +import java.time.Duration; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -28,6 +29,8 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.config.InMemorySystemConfig; @@ -36,6 +39,7 @@ import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; +import org.apache.samza.job.ApplicationStatus; import org.apache.samza.operators.KV; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; @@ -55,6 +59,7 @@ import org.apache.samza.task.AsyncStreamTask; import org.apache.samza.task.StreamTask; import org.apache.samza.test.framework.stream.CollectionStream; import org.apache.samza.test.framework.system.CollectionStreamSystemSpec; +import org.junit.Assert; /** @@ -267,22 +272,33 @@ public class TestRunner { return this; } + /** * Utility to run a test configured using TestRunner + * + * @param timeout time to wait for the high level application or low level task to finish. This timeout does not include + * input stream initialization time or the assertion time over output streams. This timeout just accounts + * for time that samza job takes run. Samza job won't be invoked with negative or zero timeout + * @throws SamzaException if Samza job fails with exception and returns UnsuccessfulFinish as the statuscode */ - public void run() { + public void run(Duration timeout) { Preconditions.checkState((app == null && taskClass != null) || (app != null && taskClass == null), "TestRunner should run for Low Level Task api or High Level Application Api"); + Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(), + "Timeouts should be positive"); final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); if (app == null) { runner.runTask(); - runner.waitForFinish(); } else { runner.run(app); - runner.waitForFinish(); + } + boolean timedOut = !runner.waitForFinish(timeout); + Assert.assertFalse("Timed out waiting for application to finish", timedOut); + ApplicationStatus status = runner.status(app); + if (status.getStatusCode() == ApplicationStatus.StatusCode.UnsuccessfulFinish) { + throw new SamzaException(ExceptionUtils.getStackTrace(status.getThrowable())); } } - /** * Utility to read the messages from a stream from the beginning, this is supposed to be used after executing the * TestRunner in order to assert over the streams (ex output streams). http://git-wip-us.apache.org/repos/asf/samza/blob/a6701f64/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java index c991b8c..ad25cae 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java @@ -19,6 +19,7 @@ package org.apache.samza.test.framework; +import java.time.Duration; import java.util.Arrays; import java.util.List; import org.apache.samza.test.framework.stream.CollectionStream; @@ -41,9 +42,26 @@ public class AsyncStreamTaskIntegrationTest { .of(MyAsyncStreamTask.class) .addInputStream(input) .addOutputStream(output) - .run(); + .run(Duration.ofSeconds(2)); Assert.assertThat(TestRunner.consumeStream(output, 1000).get(0), IsIterableContainingInOrder.contains(outputList.toArray())); } + + /** + * Job should fail because it times out too soon + */ + @Test(expected = AssertionError.class) + public void testSamzaJobTimeoutFailureForAsyncTask() { + List<Integer> inputList = Arrays.asList(1, 2, 3, 4); + + CollectionStream<Integer> input = CollectionStream.of("async-test", "ints", inputList); + CollectionStream output = CollectionStream.empty("async-test", "ints-out"); + + TestRunner + .of(MyAsyncStreamTask.class) + .addInputStream(input) + .addOutputStream(output) + .run(Duration.ofMillis(1)); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/a6701f64/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java index 307c1b5..8ac40e1 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java @@ -18,14 +18,17 @@ */ package org.apache.samza.test.framework; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Random; +import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; import org.apache.samza.operators.KV; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; +import org.apache.samza.test.controlmessages.TestData; import org.apache.samza.test.framework.stream.CollectionStream; import static org.apache.samza.test.controlmessages.TestData.PageView; import org.junit.Assert; @@ -34,7 +37,12 @@ import org.junit.Test; public class StreamApplicationIntegrationTest { - final StreamApplication app = (streamGraph, cfg) -> { + final StreamApplication pageViewFilter = (streamGraph, cfg) -> { + streamGraph.<KV<String, TestData.PageView>>getInputStream("PageView").map( + StreamApplicationIntegrationTest.Values.create()).filter(pv -> pv.getPageKey().equals("inbox")); + }; + + final StreamApplication pageViewParition = (streamGraph, cfg) -> { streamGraph.<KV<String, PageView>>getInputStream("PageView") .map(Values.create()) .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1") @@ -62,11 +70,11 @@ public class StreamApplicationIntegrationTest { CollectionStream output = CollectionStream.empty("test", "Output", 10); TestRunner - .of(app) + .of(pageViewParition) .addInputStream(input) .addOutputStream(output) .addOverrideConfig("job.default.system", "test") - .run(); + .run(Duration.ofMillis(1500)); Assert.assertEquals(TestRunner.consumeStream(output, 10000).get(random.nextInt(count)).size(), 1); } @@ -76,4 +84,44 @@ public class StreamApplicationIntegrationTest { return (M m) -> m.getValue(); } } + + /** + * Job should fail since it is missing config "job.default.system" for partitionBy Operator + */ + @Test(expected = SamzaException.class) + public void testSamzaJobStartMissingConfigFailureForStreamApplication() { + + CollectionStream<TestData.PageView> input = CollectionStream.of("test", "PageView", new ArrayList<>()); + CollectionStream output = CollectionStream.empty("test", "Output", 10); + + TestRunner + .of(pageViewParition) + .addInputStream(input) + .addOutputStream(output) + .run(Duration.ofMillis(1000)); + } + + /** + * Null page key is passed in input data which should fail filter logic + */ + @Test(expected = SamzaException.class) + public void testSamzaJobFailureForStreamApplication() { + Random random = new Random(); + int count = 10; + List<TestData.PageView> pageviews = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)]; + int memberId = i; + pageviews.add(new TestData.PageView(null, memberId)); + } + + CollectionStream<TestData.PageView> input = CollectionStream.of("test", "PageView", pageviews); + CollectionStream output = CollectionStream.empty("test", "Output", 1); + + TestRunner.of(pageViewFilter) + .addInputStream(input) + .addOutputStream(output) + .addOverrideConfig("job.default.system", "test") + .run(Duration.ofMillis(1000)); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/a6701f64/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java index e052539..2cc5977 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java @@ -19,8 +19,10 @@ package org.apache.samza.test.framework; +import java.time.Duration; import java.util.Arrays; import java.util.List; +import org.apache.samza.SamzaException; import org.apache.samza.test.framework.stream.CollectionStream; import org.hamcrest.collection.IsIterableContainingInOrder; import org.junit.Assert; @@ -36,10 +38,27 @@ public class StreamTaskIntegrationTest { CollectionStream<Integer> input = CollectionStream.of("test", "input", inputList); CollectionStream output = CollectionStream.empty("test", "output"); - TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run(); + TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run(Duration.ofSeconds(1)); Assert.assertThat(TestRunner.consumeStream(output, 1000).get(0), IsIterableContainingInOrder.contains(outputList.toArray())); } + /** + * Samza job logic expects integers, but doubles are passed here which results in failure + */ + @Test(expected = SamzaException.class) + public void testSamzaJobFailureForSyncTask() { + List<Double> inputList = Arrays.asList(1.2, 2.3, 3.33, 4.5); + + CollectionStream<Double> input = CollectionStream.of("test", "doubles", inputList); + CollectionStream output = CollectionStream.empty("test", "output"); + + TestRunner + .of(MyStreamTestTask.class) + .addInputStream(input) + .addOutputStream(output) + .run(Duration.ofSeconds(1)); + } + }
