Repository: flink Updated Branches: refs/heads/master cd610a99e -> 67c2868b3
[hotfix] [tests] Simplify mocking of the ResultPartitionWriter Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67c2868b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67c2868b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67c2868b Branch: refs/heads/master Commit: 67c2868b33a5ed8430b38ac32c65444315b0658b Parents: cd610a9 Author: Stephan Ewen <[email protected]> Authored: Thu Nov 3 20:18:43 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Nov 4 12:04:22 2016 +0100 ---------------------------------------------------------------------- .../streaming/state/RocksDBAsyncSnapshotTest.java | 8 +++++--- .../io/network/api/writer/ResultPartitionWriter.java | 2 +- .../api/streamtask/StreamIterationHeadTest.java | 10 +--------- .../runtime/operators/StreamTaskTimerTest.java | 11 +---------- .../operators/TestProcessingTimeServiceTest.java | 11 +---------- .../streaming/runtime/tasks/OneInputStreamTaskTest.java | 12 +++--------- .../runtime/tasks/OneInputStreamTaskTestHarness.java | 4 ---- .../streaming/runtime/tasks/SourceStreamTaskTest.java | 10 +--------- .../streaming/runtime/tasks/StreamTaskTestHarness.java | 2 +- .../streaming/runtime/tasks/TwoInputStreamTaskTest.java | 11 ++--------- .../runtime/tasks/TwoInputStreamTaskTestHarness.java | 4 ---- 11 files changed, 16 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index 09979b8..1c5a91c 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -31,7 +31,6 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.SubtaskState; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.VoidNamespace; @@ -49,15 +48,18 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.util.OperatingSystem; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; + import org.junit.Assert; import org.junit.Assume; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; + import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -79,8 +81,8 @@ import static org.junit.Assert.assertNotNull; * Tests for asynchronous RocksDB Key/Value state checkpoints. */ @RunWith(PowerMockRunner.class) -@PrepareForTest({ResultPartitionWriter.class, FileSystem.class}) -@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) +@PrepareForTest({FileSystem.class}) +@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"}) @SuppressWarnings("serial") public class RocksDBAsyncSnapshotTest { http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java index 79c21c6..cfab34d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java @@ -37,7 +37,7 @@ import java.io.IOException; * The {@link ResultPartitionWriter} is the runtime API for producing results. It * supports two kinds of data to be sent: buffers and events. */ -public final class ResultPartitionWriter implements EventListener<TaskEvent> { +public class ResultPartitionWriter implements EventListener<TaskEvent> { private final ResultPartition partition; http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java index a047ed4..36cf53a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java @@ -18,21 +18,14 @@ package org.apache.flink.streaming.api.streamtask; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness; + import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import static org.junit.Assert.*; -@RunWith(PowerMockRunner.class) -@PrepareForTest({ ResultPartitionWriter.class }) -@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) public class StreamIterationHeadTest { @Test @@ -49,5 +42,4 @@ public class StreamIterationHeadTest { assertEquals(1, harness.getOutput().size()); assertEquals(new Watermark(Long.MAX_VALUE), harness.getOutput().peek()); } - } http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java index f23c6d2..e0e0e91 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java @@ -20,21 +20,15 @@ package org.apache.flink.streaming.runtime.operators; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; - import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.junit.Test; import java.util.concurrent.atomic.AtomicReference; @@ -43,9 +37,6 @@ import static org.junit.Assert.*; /** * Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}. */ -@RunWith(PowerMockRunner.class) -@PrepareForTest(ResultPartitionWriter.class) -@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) @SuppressWarnings("serial") public class StreamTaskTimerTest { http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java index a3b231b..cd1f253 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java @@ -19,29 +19,20 @@ package org.apache.flink.streaming.runtime.operators; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; - import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.junit.Test; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; -@RunWith(PowerMockRunner.class) -@PrepareForTest({ResultPartitionWriter.class}) -@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) public class TestProcessingTimeServiceTest { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index d31990a..42d7cec 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -15,8 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.runtime.tasks; +package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; @@ -33,7 +33,6 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -51,12 +50,10 @@ import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; + import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; + import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; @@ -86,9 +83,6 @@ import static org.junit.Assert.fail; * used as a representative to test OneInputStreamTask, since OneInputStreamTask is used for all * OneInputStreamOperators. */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({ResultPartitionWriter.class}) -@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) public class OneInputStreamTaskTest extends TestLogger { private static final ListStateDescriptor<Integer> TEST_DESCRIPTOR = http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java index 3cf055e..3126d71 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java @@ -48,10 +48,6 @@ import java.io.IOException; * of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all * queues are empty. This must be used after entering some elements before checking the * desired output. - * - * <p> - * When using this you need to add the following line to your test class to setup Powermock: - * {@code {@literal @}PrepareForTest({ResultPartitionWriter.class})} */ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarness<OUT> { http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index 21c894e..b592fe8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -23,19 +23,15 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.util.TestHarnessUtil; + import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import java.io.Serializable; import java.util.List; @@ -51,12 +47,8 @@ import java.util.concurrent.atomic.AtomicLong; * These tests verify that the RichFunction methods are called (in correct order). And that * checkpointing/element emission don't occur concurrently. */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({ResultPartitionWriter.class}) -@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*" }) public class SourceStreamTaskTest { - /** * This test verifies that open() and close() are correctly called by the StreamTask. */ http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index ab7bf69..c531c0d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -117,7 +117,7 @@ public class StreamTaskTestHarness<OUT> { if (!(task instanceof StreamTask)) { throw new UnsupportedOperationException("getProcessingTimeService() only supported on StreamTasks."); } - return ((StreamTask) task).getProcessingTimeService(); + return ((StreamTask<?, ?>) task).getProcessingTimeService(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index 1bb3fb0..5aca7d6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -15,13 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.runtime.tasks; +package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.co.RichCoMapFunction; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -29,12 +28,9 @@ import org.apache.flink.streaming.api.operators.co.CoStreamMap; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.TestHarnessUtil; + import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -49,9 +45,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; * used as a representative to test TwoInputStreamTask, since TwoInputStreamTask is used for all * TwoInputStreamOperators. */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({ResultPartitionWriter.class}) -@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) public class TwoInputStreamTaskTest { /** http://git-wip-us.apache.org/repos/asf/flink/blob/67c2868b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java index 0e7565e..edb1642 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java @@ -54,10 +54,6 @@ import java.util.List; * of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all * queues are empty. This must be used after entering some elements before checking the * desired output. - * - * <p> - * When using this you need to add the following line to your test class to setup Powermock: - * {@code {@literal @}PrepareForTest({ResultPartitionWriter.class})} */ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTestHarness<OUT> {
