This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 291414f9cde [FLINK-32149][tests] Remove trivial mockito usages
291414f9cde is described below

commit 291414f9cde3f178a3ea3d6c6ef1d5f3dc3fa9c3
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue May 23 09:40:56 2023 +0200

    [FLINK-32149][tests] Remove trivial mockito usages
---
 .../apache/flink/client/program/ClientTest.java    | 33 ++++-----
 .../compact/BatchPartitionCommitterSinkTest.java   | 14 ++--
 .../common/io/InputStreamFSInputWrapperTest.java   | 22 ++++--
 .../hdfstests/ContinuousFileProcessingTest.java    |  5 +-
 .../metric/process/FlinkMetricContainerTest.java   | 81 +++++++++-------------
 .../arrow/sources/ArrowSourceFunctionTestBase.java |  5 +-
 .../flink/runtime/blob/BlobCacheGetTest.java       |  5 +-
 .../iomanager/AsynchronousFileIOChannelTest.java   | 33 +++++----
 .../runtime/operators/hash/HashTableTest.java      |  4 +-
 .../runtime/rest/MultipartUploadResource.java      |  4 +-
 .../metrics/AggregatingMetricsHandlerTestBase.java | 14 ++--
 .../job/metrics/MetricsHandlerTestBase.java        | 38 +++++-----
 .../runtime/util/JvmExitOnFatalErrorTest.java      |  4 +-
 .../flink/test/state/BackendSwitchSpecs.java       |  5 +-
 14 files changed, 127 insertions(+), 140 deletions(-)

diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 92d4e027b83..b218daa0f50 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -58,8 +58,6 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import javax.annotation.Nonnull;
 
@@ -73,9 +71,6 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.fail;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /** Simple and maybe stupid test to check the {@link ClusterClient} class. */
 class ClientTest {
@@ -282,27 +277,22 @@ class ClientTest {
         
assertThatFuture(clusterClient.submitJob(jobGraph)).eventuallySucceeds().isNotNull();
     }
 
+    public static class TestEntrypoint {
+        public static void main(String[] args) {
+            ExecutionEnvironment.createLocalEnvironment();
+        }
+    }
+
     /**
      * This test verifies that the local execution environment cannot be 
created when the program is
      * submitted through a client.
      */
     @Test
     void tryLocalExecution() throws ProgramInvocationException {
-        PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
-
-        when(packagedProgramMock.getUserCodeClassLoader())
-                .thenReturn(packagedProgramMock.getClass().getClassLoader());
-
-        doAnswer(
-                        new Answer<Void>() {
-                            @Override
-                            public Void answer(InvocationOnMock invocation) 
throws Throwable {
-                                ExecutionEnvironment.createLocalEnvironment();
-                                return null;
-                            }
-                        })
-                .when(packagedProgramMock)
-                .invokeInteractiveModeForExecution();
+        final PackagedProgram packagedProgramMock =
+                PackagedProgram.newBuilder()
+                        .setEntryPointClassName(TestEntrypoint.class.getName())
+                        .build();
 
         try (final ClusterClient<?> client =
                 new MiniClusterClient(
@@ -320,7 +310,8 @@ class ClientTest {
                                 fail(
                                         "Creating the local execution 
environment should not be possible");
                             })
-                    .isInstanceOf(InvalidProgramException.class);
+                    .isInstanceOf(ProgramInvocationException.class)
+                    .hasCauseInstanceOf(InvalidProgramException.class);
         }
     }
 
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSinkTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSinkTest.java
index 13f9b84198b..94cfed73ca3 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSinkTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSinkTest.java
@@ -28,12 +28,12 @@ import 
org.apache.flink.connector.file.table.stream.compact.CompactMessages;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
-import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
@@ -46,7 +46,6 @@ import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.doReturn;
 
 /** Test for {@link BatchPartitionCommitterSink}. */
 public class BatchPartitionCommitterSinkTest {
@@ -128,10 +127,11 @@ public class BatchPartitionCommitterSinkTest {
             };
 
     private static RuntimeContext getMockRuntimeContext() {
-        RuntimeContext context = Mockito.mock(RuntimeContext.class);
-        doReturn(Thread.currentThread().getContextClassLoader())
-                .when(context)
-                .getUserCodeClassLoader();
-        return context;
+        return new MockStreamingRuntimeContext(false, 0, 0) {
+            @Override
+            public ClassLoader getUserCodeClassLoader() {
+                return Thread.currentThread().getContextClassLoader();
+            }
+        };
     }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/InputStreamFSInputWrapperTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/InputStreamFSInputWrapperTest.java
index aff3206bb05..d417d772996 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/InputStreamFSInputWrapperTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/InputStreamFSInputWrapperTest.java
@@ -20,18 +20,32 @@ package org.apache.flink.api.common.io;
 
 import org.junit.Test;
 
+import java.io.IOException;
 import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
+import static org.assertj.core.api.Assertions.assertThat;
 
 public class InputStreamFSInputWrapperTest {
 
     @Test
     public void testClose() throws Exception {
-        InputStream mockedInputStream = mock(InputStream.class);
+        final AtomicBoolean closeCalled = new AtomicBoolean(false);
+        InputStream mockedInputStream =
+                new InputStream() {
+                    @Override
+                    public int read() {
+                        return 0;
+                    }
+
+                    @Override
+                    public void close() throws IOException {
+                        closeCalled.set(true);
+                        super.close();
+                    }
+                };
         InputStreamFSInputWrapper wrapper = new 
InputStreamFSInputWrapper(mockedInputStream);
         wrapper.close();
-        verify(mockedInputStream).close();
+        assertThat(closeCalled).isTrue();
     }
 }
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 9955af599ab..480df52813b 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.hdfstests;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -46,6 +45,7 @@ import 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
 import 
org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.Preconditions;
@@ -61,7 +61,6 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -1135,7 +1134,7 @@ public class ContinuousFileProcessingTest {
             FileInputFormat<OUT> format, FileProcessingMode 
fileProcessingMode) {
         ContinuousFileMonitoringFunction<OUT> monitoringFunction =
                 new ContinuousFileMonitoringFunction<>(format, 
fileProcessingMode, 1, INTERVAL);
-        
monitoringFunction.setRuntimeContext(Mockito.mock(RuntimeContext.class));
+        monitoringFunction.setRuntimeContext(new 
MockStreamingRuntimeContext(false, 0, 0));
         return monitoringFunction;
     }
 }
diff --git 
a/flink-python/src/test/java/org/apache/flink/python/metric/process/FlinkMetricContainerTest.java
 
b/flink-python/src/test/java/org/apache/flink/python/metric/process/FlinkMetricContainerTest.java
index 9645eac91fb..16595fd8ac0 100644
--- 
a/flink-python/src/test/java/org/apache/flink/python/metric/process/FlinkMetricContainerTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/python/metric/process/FlinkMetricContainerTest.java
@@ -18,16 +18,14 @@
 
 package org.apache.flink.python.metric.process;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
 import org.apache.flink.runtime.metrics.groups.MetricGroupTest;
+import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
 
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.metrics.DistributionData;
@@ -40,25 +38,16 @@ import org.apache.beam.sdk.metrics.MetricName;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
 
 import java.util.Arrays;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 /** Tests for {@link FlinkMetricContainer}. */
 class FlinkMetricContainerTest {
 
-    @Mock private RuntimeContext runtimeContext;
-    @Mock private OperatorMetricGroup metricGroup;
+    private InterceptingOperatorMetricGroup metricGroup = new 
InterceptingOperatorMetricGroup();
 
     private FlinkMetricContainer container;
 
@@ -70,11 +59,24 @@ class FlinkMetricContainerTest {
 
     @BeforeEach
     void beforeTest() {
-        MockitoAnnotations.initMocks(this);
-        when(runtimeContext.getMetricGroup()).thenReturn(metricGroup);
-        when(metricGroup.addGroup(any(), any())).thenReturn(metricGroup);
-        when(metricGroup.addGroup(any())).thenReturn(metricGroup);
-        container = new FlinkMetricContainer(runtimeContext.getMetricGroup());
+        metricGroup =
+                new InterceptingOperatorMetricGroup() {
+                    @Override
+                    public MetricGroup addGroup(int name) {
+                        return this;
+                    }
+
+                    @Override
+                    public MetricGroup addGroup(String name) {
+                        return this;
+                    }
+
+                    @Override
+                    public MetricGroup addGroup(String key, String value) {
+                        return this;
+                    }
+                };
+        container = new FlinkMetricContainer(metricGroup);
     }
 
     @Test
@@ -107,9 +109,6 @@ class FlinkMetricContainerTest {
 
     @Test
     void testCounterMonitoringInfoUpdate() {
-        SimpleCounter userCounter = new SimpleCounter();
-        when(metricGroup.counter("myCounter")).thenReturn(userCounter);
-
         MonitoringInfo userMonitoringInfo =
                 new SimpleMonitoringInfoBuilder()
                         .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
@@ -119,15 +118,14 @@ class FlinkMetricContainerTest {
                         .setInt64SumValue(111)
                         .build();
 
-        assertThat(userCounter.getCount()).isEqualTo(0L);
+        assertThat(metricGroup.get("myCounter")).isNull();
         container.updateMetrics("step", ImmutableList.of(userMonitoringInfo));
+        Counter userCounter = (Counter) metricGroup.get("myCounter");
         assertThat(userCounter.getCount()).isEqualTo(111L);
     }
 
     @Test
     void testMeterMonitoringInfoUpdate() {
-        MeterView userMeter = new MeterView(new SimpleCounter());
-        when(metricGroup.meter(eq("myMeter"), 
any(Meter.class))).thenReturn(userMeter);
         String namespace =
                 "[\"key\", \"value\", \"MetricGroupType.key\", 
\"MetricGroupType.value\", \"60\"]";
 
@@ -139,9 +137,10 @@ class FlinkMetricContainerTest {
                         .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
"anyPTransform")
                         .setInt64SumValue(111)
                         .build();
-        assertThat(userMeter.getCount()).isEqualTo(0L);
-        assertThat(userMeter.getRate()).isEqualTo(0.0);
+
+        assertThat(metricGroup.get("myMeter")).isNull();
         container.updateMetrics("step", ImmutableList.of(userMonitoringInfo));
+        MeterView userMeter = (MeterView) metricGroup.get("myMeter");
         userMeter.update();
         assertThat(userMeter.getCount()).isEqualTo(111L);
         assertThat(userMeter.getRate()).isEqualTo(1.85); // 111 div 60 = 1.85
@@ -159,15 +158,10 @@ class FlinkMetricContainerTest {
                         .build();
 
         container.updateMetrics("step", ImmutableList.of(userMonitoringInfo));
-        verify(metricGroup)
-                .gauge(
-                        eq("myGauge"),
-                        argThat(
-                                
(ArgumentMatcher<FlinkMetricContainer.FlinkGauge>)
-                                        argument -> {
-                                            Long actual = argument.getValue();
-                                            return actual.equals(111L);
-                                        }));
+        FlinkMetricContainer.FlinkGauge myGauge =
+                (FlinkMetricContainer.FlinkGauge) metricGroup.get("myGauge");
+
+        assertThat(myGauge.getValue()).isEqualTo(111L);
     }
 
     @Test
@@ -184,16 +178,9 @@ class FlinkMetricContainerTest {
         container.updateMetrics("step", ImmutableList.of(userMonitoringInfo));
         // The one Flink distribution that gets created is a 
FlinkDistributionGauge; here we verify
         // its initial (and in this test, final) value
-        verify(metricGroup)
-                .gauge(
-                        eq("myDistribution"),
-                        argThat(
-                                
(ArgumentMatcher<FlinkMetricContainer.FlinkDistributionGauge>)
-                                        argument -> {
-                                            DistributionResult actual = 
argument.getValue();
-                                            DistributionResult expected =
-                                                    
DistributionResult.create(30, 10, 1, 5);
-                                            return actual.equals(expected);
-                                        }));
+        FlinkMetricContainer.FlinkDistributionGauge myGauge =
+                (FlinkMetricContainer.FlinkDistributionGauge) 
metricGroup.get("myDistribution");
+
+        assertThat(myGauge.getValue()).isEqualTo(DistributionResult.create(30, 
10, 1, 5));
     }
 }
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/ArrowSourceFunctionTestBase.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/ArrowSourceFunctionTestBase.java
index c569bd9da46..f72d673cbc5 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/ArrowSourceFunctionTestBase.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/ArrowSourceFunctionTestBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.runtime.arrow.sources;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.testutils.MultiShotLatch;
@@ -28,6 +27,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.arrow.ArrowUtils;
 import org.apache.flink.table.runtime.arrow.ArrowWriter;
@@ -41,7 +41,6 @@ import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.ipc.ArrowStreamWriter;
 import org.assertj.core.api.HamcrestCondition;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -299,7 +298,7 @@ abstract class ArrowSourceFunctionTestBase {
                 createArrowSourceFunction(
                         ArrowUtils.readArrowBatches(
                                 Channels.newChannel(new 
ByteArrayInputStream(baos.toByteArray()))));
-        
arrowSourceFunction.setRuntimeContext(Mockito.mock(RuntimeContext.class));
+        arrowSourceFunction.setRuntimeContext(new 
MockStreamingRuntimeContext(false, 0, 0));
         return arrowSourceFunction;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
index 9b16d66c387..513011c9505 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
@@ -72,7 +72,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
-import static org.mockito.Mockito.mock;
 
 /**
  * Tests for GET-specific parts of the {@link BlobCacheService}.
@@ -547,8 +546,8 @@ public class BlobCacheGetTest extends TestLogger {
             final JobID jobId, final BlobKey.BlobType blobType, final boolean 
cacheAccessesHAStore)
             throws IOException, InterruptedException, ExecutionException {
         final Configuration config = new Configuration();
-        final BlobStore blobStoreServer = mock(BlobStore.class);
-        final BlobStore blobStoreCache = mock(BlobStore.class);
+        final BlobStore blobStoreServer = new VoidBlobStore();
+        final BlobStore blobStoreCache = new VoidBlobStore();
 
         final int numberConcurrentGetOperations = 3;
         final List<CompletableFuture<File>> getOperations =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
index 0da6fe70520..af85fc4073c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
@@ -21,11 +21,10 @@ package org.apache.flink.runtime.io.disk.iomanager;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.util.TestNotificationListener;
 
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,9 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
 
-@RunWith(PowerMockRunner.class)
 public class AsynchronousFileIOChannelTest {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AsynchronousFileIOChannelTest.class);
@@ -66,7 +63,7 @@ public class AsynchronousFileIOChannelTest {
 
         final RequestQueue<WriteRequest> requestQueue = new 
RequestQueue<WriteRequest>();
 
-        final RequestDoneCallback<Buffer> ioChannelCallback = 
mock(RequestDoneCallback.class);
+        final RequestDoneCallback<Buffer> ioChannelCallback = new 
NoOpCallback<>();
 
         final TestNotificationListener listener = new 
TestNotificationListener();
 
@@ -83,8 +80,8 @@ public class AsynchronousFileIOChannelTest {
                 final CountDownLatch sync = new CountDownLatch(3);
 
                 // The mock requests
-                final Buffer buffer = mock(Buffer.class);
-                final WriteRequest request = mock(WriteRequest.class);
+                final Buffer buffer = BufferBuilderTestUtils.buildSomeBuffer();
+                final WriteRequest request = new NoOpWriteRequest();
 
                 // Add requests task
                 Callable<Void> addRequestsTask =
@@ -199,7 +196,7 @@ public class AsynchronousFileIOChannelTest {
         final RequestQueue<WriteRequest> requestQueue = new 
RequestQueue<WriteRequest>();
 
         @SuppressWarnings("unchecked")
-        final RequestDoneCallback<Buffer> ioChannelCallback = 
mock(RequestDoneCallback.class);
+        final RequestDoneCallback<Buffer> ioChannelCallback = new 
NoOpCallback<>();
 
         final TestNotificationListener listener = new 
TestNotificationListener();
 
@@ -213,7 +210,7 @@ public class AsynchronousFileIOChannelTest {
 
                 final CountDownLatch sync = new CountDownLatch(2);
 
-                final WriteRequest request = mock(WriteRequest.class);
+                final WriteRequest request = new NoOpWriteRequest();
 
                 ioChannel.close();
 
@@ -339,7 +336,9 @@ public class AsynchronousFileIOChannelTest {
 
             BlockChannelWriterWithCallback<MemorySegment> writer =
                     new AsynchronousBlockWriterWithCallback(
-                            channelId, ioMan.getWriteRequestQueue(channelId), 
new NoOpCallback()) {
+                            channelId,
+                            ioMan.getWriteRequestQueue(channelId),
+                            new NoOpCallback<>()) {
 
                         private int numBlocks;
 
@@ -377,13 +376,21 @@ public class AsynchronousFileIOChannelTest {
         }
     }
 
-    private static class NoOpCallback implements 
RequestDoneCallback<MemorySegment> {
+    private static class NoOpCallback<T> implements RequestDoneCallback<T> {
 
         @Override
-        public void requestSuccessful(MemorySegment buffer) {}
+        public void requestSuccessful(T buffer) {}
 
         @Override
-        public void requestFailed(MemorySegment buffer, IOException e) {}
+        public void requestFailed(T buffer, IOException e) {}
+    }
+
+    private static class NoOpWriteRequest implements WriteRequest {
+        @Override
+        public void requestDone(IOException ioex) {}
+
+        @Override
+        public void write() {}
     }
 
     private static class FailingWriteRequest implements WriteRequest {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
index 40b1da22645..e58005f6915 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
@@ -40,7 +40,6 @@ import org.apache.flink.util.MutableObjectIterator;
 
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
@@ -173,9 +172,8 @@ public class HashTableTest {
         final TypeComparator<ByteValue> probeComparator =
                 new ValueComparator<>(true, ByteValue.class);
 
-        @SuppressWarnings("unchecked")
         final TypePairComparator<ByteValue, ByteValue> pairComparator =
-                Mockito.mock(TypePairComparator.class);
+                new GenericPairComparator<>(buildComparator, probeComparator);
 
         try (final IOManager ioMan = new IOManagerAsync()) {
             final int pageSize = 32 * 1024;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 19939b2dce9..80601bf83f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
 import org.apache.flink.runtime.rest.util.TestRestServerEndpoint;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.BiConsumerWithException;
@@ -67,7 +68,6 @@ import java.util.stream.Stream;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
 
 /** Test base for verifying support of multipart uploads via REST. */
 public class MultipartUploadResource extends ExternalResource {
@@ -103,7 +103,7 @@ public class MultipartUploadResource extends 
ExternalResource {
         configuredUploadDir = temporaryFolder.newFolder().toPath();
         config.setString(WebOptions.UPLOAD_DIR, 
configuredUploadDir.toString());
 
-        RestfulGateway mockRestfulGateway = mock(RestfulGateway.class);
+        RestfulGateway mockRestfulGateway = new TestingRestfulGateway();
 
         final GatewayRetriever<RestfulGateway> mockGatewayRetriever =
                 () -> CompletableFuture.completedFuture(mockRestfulGateway);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
index c96a541796d..5e0b5fdbeeb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
@@ -32,8 +32,8 @@ import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetr
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
@@ -54,7 +54,6 @@ import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.powermock.api.mockito.PowerMockito.mock;
 
 /** Test base for handlers that extend {@link 
AbstractAggregatingMetricsHandler}. */
 public abstract class AggregatingMetricsHandlerTestBase<
@@ -62,16 +61,13 @@ public abstract class AggregatingMetricsHandlerTestBase<
                 P extends AbstractAggregatedMetricsParameters<?>>
         extends TestLogger {
 
-    private static final CompletableFuture<String> TEST_REST_ADDRESS;
     private static final DispatcherGateway MOCK_DISPATCHER_GATEWAY;
     private static final GatewayRetriever<DispatcherGateway> LEADER_RETRIEVER;
     private static final Time TIMEOUT = Time.milliseconds(50);
     private static final Map<String, String> TEST_HEADERS = 
Collections.emptyMap();
 
     static {
-        TEST_REST_ADDRESS = 
CompletableFuture.completedFuture("localhost:12345");
-
-        MOCK_DISPATCHER_GATEWAY = mock(DispatcherGateway.class);
+        MOCK_DISPATCHER_GATEWAY = new TestingDispatcherGateway();
 
         LEADER_RETRIEVER =
                 new GatewayRetriever<DispatcherGateway>() {
@@ -89,9 +85,9 @@ public abstract class AggregatingMetricsHandlerTestBase<
     @Before
     public void setUp() throws Exception {
         MetricFetcher fetcher =
-                new MetricFetcherImpl<RestfulGateway>(
-                        mock(GatewayRetriever.class),
-                        mock(MetricQueryServiceRetriever.class),
+                new MetricFetcherImpl<>(
+                        () -> null,
+                        rpcServiceAddress -> null,
                         Executors.directExecutor(),
                         TestingUtils.TIMEOUT,
                         
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
index dd9fe021fe3..ed1d3959403 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
@@ -28,13 +28,12 @@ import 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
 
 import java.util.Collections;
 import java.util.Map;
@@ -44,7 +43,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
 
 /** Unit test base class for subclasses of {@link AbstractMetricsHandler}. */
 public abstract class MetricsHandlerTestBase<T extends AbstractMetricsHandler> 
extends TestLogger {
@@ -53,18 +51,15 @@ public abstract class MetricsHandlerTestBase<T extends 
AbstractMetricsHandler> e
 
     private static final int TEST_METRIC_VALUE = 1000;
 
-    static final CompletableFuture<String> TEST_REST_ADDRESS =
-            CompletableFuture.completedFuture("localhost:12345");
-
     static final Time TIMEOUT = Time.milliseconds(50);
 
     static final Map<String, String> TEST_HEADERS = Collections.emptyMap();
 
-    @Mock MetricFetcher mockMetricFetcher;
+    MetricFetcher mockMetricFetcher;
 
     GatewayRetriever<DispatcherGateway> leaderRetriever;
 
-    @Mock private DispatcherGateway mockDispatcherGateway;
+    private final DispatcherGateway mockDispatcherGateway = new 
TestingDispatcherGateway();
 
     private T metricsHandler;
 
@@ -72,23 +67,28 @@ public abstract class MetricsHandlerTestBase<T extends 
AbstractMetricsHandler> e
 
     @Before
     public void setUp() {
-        MockitoAnnotations.initMocks(this);
+        final MetricStore metricStore = new MetricStore();
+        metricStore.add(
+                new MetricDump.CounterDump(
+                        getQueryScopeInfo(), TEST_METRIC_NAME, 
TEST_METRIC_VALUE));
+        mockMetricFetcher =
+                new MetricFetcher() {
+                    @Override
+                    public MetricStore getMetricStore() {
+                        return metricStore;
+                    }
 
-        this.leaderRetriever =
-                new GatewayRetriever<DispatcherGateway>() {
                     @Override
-                    public CompletableFuture<DispatcherGateway> getFuture() {
-                        return 
CompletableFuture.completedFuture(mockDispatcherGateway);
+                    public void update() {}
+
+                    @Override
+                    public long getLastUpdateTime() {
+                        return 0;
                     }
                 };
+        this.leaderRetriever = () -> 
CompletableFuture.completedFuture(mockDispatcherGateway);
         this.pathParameters = getPathParameters();
         this.metricsHandler = getMetricsHandler();
-
-        final MetricStore metricStore = new MetricStore();
-        metricStore.add(
-                new MetricDump.CounterDump(
-                        getQueryScopeInfo(), TEST_METRIC_NAME, 
TEST_METRIC_VALUE));
-        when(mockMetricFetcher.getMetricStore()).thenReturn(metricStore);
     }
 
     /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index dadac718a67..79439059074 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -63,7 +63,6 @@ import 
org.apache.flink.runtime.taskexecutor.NoOpPartitionProducerStateChecker;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
-import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.NoOpCheckpointResponder;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
@@ -86,7 +85,6 @@ import java.util.concurrent.Executors;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static org.junit.Assume.assumeFalse;
-import static org.mockito.Mockito.mock;
 
 /**
  * Test that verifies the behavior of blocking shutdown hooks and of the {@link
@@ -224,7 +222,7 @@ public class JvmExitOnFatalErrorTest extends TestLogger {
                                 changelogStorage,
                                 new 
TaskExecutorStateChangelogStoragesManager(),
                                 null,
-                                mock(CheckpointResponder.class));
+                                NoOpCheckpointResponder.INSTANCE);
 
                 Task task =
                         new Task(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java 
b/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java
index 2053fa76646..9e825b682d2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/BackendSwitchSpecs.java
@@ -30,7 +30,6 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -44,7 +43,6 @@ import 
org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import org.junit.rules.TemporaryFolder;
-import org.mockito.Mockito;
 
 import java.util.Collection;
 
@@ -139,7 +137,8 @@ public final class BackendSwitchSpecs {
                 throws Exception {
             ExecutionConfig executionConfig = new ExecutionConfig();
             return new HeapKeyedStateBackendBuilder<>(
-                            Mockito.mock(TaskKvStateRegistry.class),
+                            new KvStateRegistry()
+                                    .createTaskRegistry(new JobID(), new 
JobVertexID()),
                             StringSerializer.INSTANCE,
                             this.getClass().getClassLoader(),
                             numKeyGroups,


Reply via email to