pnowojski closed pull request #6892: [FLINK-10607][network][test] Unify to 
remove duplicated NoOpResultPartitionConsumableNotifier
URL: https://github.com/apache/flink/pull/6892
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index f790b5f02b9..f0f1926b008 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -22,8 +22,8 @@
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -313,7 +313,7 @@ private static ResultPartition createResultPartition(
                        channels,
                        channels,
                        mock(ResultPartitionManager.class),
-                       mock(ResultPartitionConsumableNotifier.class),
+                       new NoOpResultPartitionConsumableNotifier(),
                        mock(IOManager.class),
                        false);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResultPartitionConsumableNotifier.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResultPartitionConsumableNotifier.java
new file mode 100644
index 00000000000..c666d649077
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResultPartitionConsumableNotifier.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+
+/**
+ * Test implementation of {@link ResultPartitionConsumableNotifier}.
+ */
+public class NoOpResultPartitionConsumableNotifier implements 
ResultPartitionConsumableNotifier {
+       @Override
+       public void notifyPartitionConsumable(JobID jobId, ResultPartitionID 
partitionId, TaskActions taskActions) {}
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 2afd6d4aff9..448e989c6e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -27,6 +27,7 @@
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -105,8 +106,7 @@ public void testConcurrentConsumeMultiplePartitions() 
throws Exception {
                        (parallelism * producerBufferPoolSize) + (parallelism * 
parallelism),
                        TestBufferFactory.BUFFER_SIZE);
 
-               final ResultPartitionConsumableNotifier 
partitionConsumableNotifier =
-                       mock(ResultPartitionConsumableNotifier.class);
+               final ResultPartitionConsumableNotifier 
partitionConsumableNotifier = new NoOpResultPartitionConsumableNotifier();
 
                final TaskActions taskActions = mock(TaskActions.class);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 02d7a1da5a2..5ceb8236691 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -55,7 +55,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -714,7 +714,7 @@ public void testTaskSubmission() throws Exception {
                        mock(TaskManagerActions.class),
                        mock(CheckpointResponder.class),
                        libraryCacheManager,
-                       mock(ResultPartitionConsumableNotifier.class),
+                       new NoOpResultPartitionConsumableNotifier(),
                        mock(PartitionProducerStateChecker.class));
 
                final JobManagerTable jobManagerTable = new JobManagerTable();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 3edf5f11975..a05a2e0e580 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -43,6 +43,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -216,7 +217,7 @@ private Task createTask(Class<? extends AbstractInvokable> 
invokableClass) throw
                when(libCache.getClassLoader(any(JobID.class))).thenReturn(new 
TestUserCodeClassLoader());
 
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
-               ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
+               ResultPartitionConsumableNotifier consumableNotifier = new 
NoOpResultPartitionConsumableNotifier();
                PartitionProducerStateChecker partitionProducerStateChecker = 
mock(PartitionProducerStateChecker.class);
                Executor executor = mock(Executor.class);
                TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 49e995e8d81..a354e2cc049 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -47,6 +47,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -250,7 +251,7 @@ public void testExecutionFailsInBlobsMissing() throws 
Exception {
        public void testExecutionFailsInNetworkRegistration() throws Exception {
                // mock a network manager that rejects registration
                final ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
-               final ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
+               final ResultPartitionConsumableNotifier consumableNotifier = 
new NoOpResultPartitionConsumableNotifier();
                final PartitionProducerStateChecker 
partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
                final TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
 
@@ -571,7 +572,7 @@ public void testTriggerPartitionStateUpdate() throws 
Exception {
                final PartitionProducerStateChecker partitionChecker = 
mock(PartitionProducerStateChecker.class);
                final TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
 
-               final ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
+               final ResultPartitionConsumableNotifier consumableNotifier = 
new NoOpResultPartitionConsumableNotifier();
                final NetworkEnvironment network = 
mock(NetworkEnvironment.class);
                
when(network.getResultPartitionManager()).thenReturn(mock(ResultPartitionManager.class));
                
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
@@ -941,7 +942,7 @@ private void setState(Task task, ExecutionState state) {
                        libraryCacheManager = mock(LibraryCacheManager.class);
                        
when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
 
-                       consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
+                       consumableNotifier = new 
NoOpResultPartitionConsumableNotifier();
                        partitionProducerStateChecker = 
mock(PartitionProducerStateChecker.class);
 
                        final ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index c826e774ec2..88aff103790 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -46,7 +46,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -64,9 +64,6 @@
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.taskmanager.TaskActions;
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.testutils.TestJvmProcess;
 import org.apache.flink.util.OperatingSystem;
@@ -269,12 +266,6 @@ public void acknowledgeCheckpoint(JobID j, 
ExecutionAttemptID e, long i, Checkpo
                        public void declineCheckpoint(JobID j, 
ExecutionAttemptID e, long l, Throwable t) {}
                }
 
-               private static final class 
NoOpResultPartitionConsumableNotifier implements 
ResultPartitionConsumableNotifier {
-
-                       @Override
-                       public void notifyPartitionConsumable(JobID j, 
ResultPartitionID p, TaskActions t) {}
-               }
-
                private static final class NoOpPartitionProducerStateChecker 
implements PartitionProducerStateChecker {
 
                        @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 1b769c80f0a..941b9372d1f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -39,8 +39,8 @@
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -316,10 +316,4 @@ public void triggerPartitionProducerStateCheck(
                @Override
                public void failExternally(Throwable cause) {}
        }
-
-       private static final class NoOpResultPartitionConsumableNotifier 
implements ResultPartitionConsumableNotifier {
-
-               @Override
-               public void notifyPartitionConsumable(JobID j, 
ResultPartitionID p, TaskActions t) {}
-       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index beed46df399..d3ef2b286b3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -46,7 +46,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -288,7 +288,7 @@ private static Task createTask(
                                blobService.getPermanentBlobService()),
                        new TestingTaskManagerRuntimeInfo(),
                        
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
-                       mock(ResultPartitionConsumableNotifier.class),
+                       new NoOpResultPartitionConsumableNotifier(),
                        mock(PartitionProducerStateChecker.class),
                        mock(Executor.class));
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 09276986c58..95227a6a4b3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -45,7 +45,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -175,7 +175,7 @@ public void 
testConcurrentAsyncCheckpointCannotFailFinishedStreamTask() throws E
                        mock(FileCache.class),
                        taskManagerRuntimeInfo,
                        
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
-                       mock(ResultPartitionConsumableNotifier.class),
+                       new NoOpResultPartitionConsumableNotifier(),
                        mock(PartitionProducerStateChecker.class),
                        Executors.directExecutor());
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index bc145799fb3..a593bd5e004 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -52,6 +52,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -892,7 +893,7 @@ public static Task createTask(
                
when(libCache.getClassLoader(any(JobID.class))).thenReturn(StreamTaskTest.class.getClassLoader());
 
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
-               ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
+               ResultPartitionConsumableNotifier consumableNotifier = new 
NoOpResultPartitionConsumableNotifier();
                PartitionProducerStateChecker partitionProducerStateChecker = 
mock(PartitionProducerStateChecker.class);
                Executor executor = mock(Executor.class);
                TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index cd8a4fafd9a..51c15afeca9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -47,7 +47,7 @@
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -254,7 +254,7 @@ private static Task createTask(
                                        blobService.getPermanentBlobService()),
                                new TestingTaskManagerRuntimeInfo(),
                                
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
-                               mock(ResultPartitionConsumableNotifier.class),
+                               new NoOpResultPartitionConsumableNotifier(),
                                mock(PartitionProducerStateChecker.class),
                                Executors.directExecutor());
        }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to