This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit c1442ea8ad2df63361ab9bfbca1d910afc6ebd7d
Author: Arvid Heise <ar...@apache.org>
AuthorDate: Thu Apr 10 14:52:57 2025 +0200

    [FLINK-37644] Remove mockito
---
 .../c0d94764-76a0-4c50-b617-70b1754c4612           | 29 +++++++++++-----------
 .../source/reader/DynamicKafkaSourceReader.java    | 12 ++++++---
 .../reader/DynamicKafkaSourceReaderTest.java       | 14 +++--------
 .../kafka/source/reader/KafkaSourceReaderTest.java | 23 +++++++++++------
 4 files changed, 40 insertions(+), 38 deletions(-)

diff --git 
a/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612
 
b/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612
index 20e55f4c..b96d3c6b 100644
--- 
a/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612
+++ 
b/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612
@@ -13,27 +13,26 @@ Method 
<org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSource.getKa
 Method 
<org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager.close()>
 calls method 
<org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.close()> in 
(KafkaClusterMetricGroupManager.java:73)
 Method 
<org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager.close(java.lang.String)>
 calls method 
<org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.close()> in 
(KafkaClusterMetricGroupManager.java:62)
 Method 
<org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager.register(java.lang.String,
 
org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroup)>
 checks instanceof 
<org.apache.flink.runtime.metrics.groups.AbstractMetricGroup> in 
(KafkaClusterMetricGroupManager.java:42)
-Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()>
 calls constructor 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.<init>(int)>
 in (DynamicKafkaSourceReader.java:475)
-Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()>
 calls method 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()>
 in (DynamicKafkaSourceReader.java:474)
-Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()>
 calls method 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()>
 in (DynamicKafkaSourceReader.java:485)
-Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.getAvailabilityHelper()>
 has return type 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper> in 
(DynamicKafkaSourceReader.java:0)
-Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.getAvailabilityHelper()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(DynamicKafkaSourceReader.java:0)
+Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()>
 calls constructor 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.<init>(int)>
 in (DynamicKafkaSourceReader.java:479)
+Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()>
 calls method 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()>
 in (DynamicKafkaSourceReader.java:476)
+Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()>
 calls method 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()>
 in (DynamicKafkaSourceReader.java:489)
+Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.getAvailabilityHelperSize()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(DynamicKafkaSourceReader.java:0)
 Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isActivelyConsumingSplits()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(DynamicKafkaSourceReader.java:0)
-Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isAvailable()>
 calls method 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()>
 in (DynamicKafkaSourceReader.java:383)
-Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isAvailable()>
 calls method 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.resetToUnAvailable()>
 in (DynamicKafkaSourceReader.java:381)
-Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()>
 calls method 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int,
 java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:496)
+Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isAvailable()>
 calls method 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()>
 in (DynamicKafkaSourceReader.java:385)
+Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isAvailable()>
 calls method 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.resetToUnAvailable()>
 in (DynamicKafkaSourceReader.java:383)
+Method 
<org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()>
 calls method 
<org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int,
 java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:500)
 Method 
<org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducerPool()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(ExactlyOnceKafkaWriter.java:0)
 Method 
<org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(ExactlyOnceKafkaWriter.java:0)
 Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getBackchannel()> 
is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(KafkaCommitter.java:0)
 Method 
<org.apache.flink.connector.kafka.sink.KafkaCommitter.getCommittingProducer()> 
is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(KafkaCommitter.java:0)
-Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> 
in (KafkaSink.java:177)
-Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.getInputs()> in 
(KafkaSink.java:180)
-Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in 
(KafkaSink.java:176)
-Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method 
<org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)>
 in (KafkaSink.java:179)
-Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 checks instanceof 
<org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in 
(KafkaSink.java:176)
+Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> 
in (KafkaSink.java:178)
+Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.getInputs()> in 
(KafkaSink.java:181)
+Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in 
(KafkaSink.java:177)
+Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method 
<org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)>
 in (KafkaSink.java:180)
+Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 checks instanceof 
<org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in 
(KafkaSink.java:177)
 Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 has generic parameter type 
<org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.kafka.sink.KafkaCommittable>>>
 with type argument depending on 
<org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in 
(KafkaSink.java:0)
 Method 
<org.apache.flink.connector.kafka.sink.KafkaSink.getKafkaProducerConfig()> is 
annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(KafkaSink.java:0)
-Method 
<org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)>
 calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, 
org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in 
(KafkaSinkBuilder.java:153)
+Method 
<org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)>
 calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, 
org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in 
(KafkaSinkBuilder.java:154)
 Method 
<org.apache.flink.connector.kafka.sink.KafkaWriter.getCurrentProducer()> is 
annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(KafkaWriter.java:0)
 Method 
<org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl.getProducers()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(ProducerPoolImpl.java:0)
 Method 
<org.apache.flink.connector.kafka.source.KafkaSource.createReader(org.apache.flink.api.connector.source.SourceReaderContext,
 java.util.function.Consumer)> is annotated with 
<org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
@@ -51,6 +50,6 @@ Method 
<org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getOffs
 Method 
<org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow(org.apache.flink.table.data.RowData,
 org.apache.flink.types.RowKind, 
[Lorg.apache.flink.table.data.RowData$FieldGetter;)> has parameter of type 
<[Lorg.apache.flink.table.data.RowData$FieldGetter;> in 
(DynamicKafkaRecordSerializationSchema.java:0)
 Method 
<org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection(org.apache.flink.configuration.ReadableConfig,
 org.apache.flink.table.types.DataType)> calls method 
<org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames(org.apache.flink.table.types.logical.LogicalType)>
 in (KafkaConnectorOptionsUtil.java:520)
 Method 
<org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig,
 org.apache.flink.table.types.DataType)> calls method 
<org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)>
 in (KafkaConnectorOptionsUtil.java:564)
-Method 
<org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(org.apache.flink.table.connector.sink.DynamicTableSink$Context,
 org.apache.flink.table.connector.format.EncodingFormat, [I, java.lang.String)> 
calls method 
<org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType,
 java.lang.String)> in (KafkaDynamicSink.java:401)
+Method 
<org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(org.apache.flink.table.connector.sink.DynamicTableSink$Context,
 org.apache.flink.table.connector.format.EncodingFormat, [I, java.lang.String)> 
calls method 
<org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType,
 java.lang.String)> in (KafkaDynamicSink.java:408)
 Method 
<org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getFieldGetters(java.util.List,
 [I)> has return type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in 
(KafkaDynamicSink.java:0)
 Method 
<org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context,
 org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> 
calls method 
<org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType,
 java.lang.String)> in (KafkaDynamicSource.java:574)
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
index ca745441..38952519 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
@@ -91,6 +91,7 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
     private final List<DynamicKafkaSourceSplit> pendingSplits;
 
     private MultipleFuturesAvailabilityHelper availabilityHelper;
+    private int availabilityHelperSize;
     private boolean isActivelyConsumingSplits;
     private boolean isNoMoreSplits;
     private AtomicBoolean restartingReaders;
@@ -110,7 +111,8 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
                         
.addGroup(KafkaClusterMetricGroup.DYNAMIC_KAFKA_SOURCE_METRIC_GROUP);
         this.kafkaClusterMetricGroupManager = new 
KafkaClusterMetricGroupManager();
         this.pendingSplits = new ArrayList<>();
-        this.availabilityHelper = new MultipleFuturesAvailabilityHelper(0);
+        this.availabilityHelper =
+                new 
MultipleFuturesAvailabilityHelper(this.availabilityHelperSize = 0);
         this.isNoMoreSplits = false;
         this.isActivelyConsumingSplits = false;
         this.restartingReaders = new AtomicBoolean();
@@ -472,7 +474,9 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
      */
     private void completeAndResetAvailabilityHelper() {
         CompletableFuture<?> cachedPreviousFuture = 
availabilityHelper.getAvailableFuture();
-        availabilityHelper = new 
MultipleFuturesAvailabilityHelper(clusterReaderMap.size());
+        availabilityHelper =
+                new MultipleFuturesAvailabilityHelper(
+                        this.availabilityHelperSize = clusterReaderMap.size());
         syncAvailabilityHelperWithReaders();
 
         // We cannot immediately complete the previous future here. We must 
complete it only when
@@ -538,8 +542,8 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
     }
 
     @VisibleForTesting
-    public MultipleFuturesAvailabilityHelper getAvailabilityHelper() {
-        return availabilityHelper;
+    public int getAvailabilityHelperSize() {
+        return availabilityHelperSize;
     }
 
     @VisibleForTesting
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
index 5094e015..feedac8f 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java
@@ -39,7 +39,6 @@ import 
org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.powermock.reflect.Whitebox;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -175,7 +174,7 @@ public class DynamicKafkaSourceReaderTest extends 
SourceReaderTestBase<DynamicKa
             assertThat(futureAtInit)
                     .as("future is not complete at fresh startup since no 
readers are created")
                     .isNotDone();
-            assertThat(getAvailabilityHelperSize(reader)).isZero();
+            assertThat(reader.getAvailabilityHelperSize()).isZero();
 
             reader.start();
             MetadataUpdateEvent metadata =
@@ -193,7 +192,7 @@ public class DynamicKafkaSourceReaderTest extends 
SourceReaderTestBase<DynamicKa
                     .as(
                             "New future should have been produced since 
metadata triggers reader creation")
                     .isNotSameAs(futureAfterSplitAssignment);
-            assertThat(getAvailabilityHelperSize(reader)).isEqualTo(2);
+            assertThat(reader.getAvailabilityHelperSize()).isEqualTo(2);
 
             // remove cluster 0
             KafkaStream kafkaStream = 
DynamicKafkaSourceTestHelper.getKafkaStream(TOPIC);
@@ -204,17 +203,10 @@ public class DynamicKafkaSourceReaderTest extends 
SourceReaderTestBase<DynamicKa
             assertThat(futureAfterRemovingCluster0)
                     .as("There should new future since the metadata has 
changed")
                     .isNotSameAs(futureAfterSplitAssignment);
-            assertThat(getAvailabilityHelperSize(reader)).isEqualTo(1);
+            assertThat(reader.getAvailabilityHelperSize()).isEqualTo(1);
         }
     }
 
-    private int getAvailabilityHelperSize(DynamicKafkaSourceReader<?> reader) {
-        return ((CompletableFuture<?>[])
-                        Whitebox.getInternalState(
-                                reader.getAvailabilityHelper(), 
"futuresToCombine"))
-                .length;
-    }
-
     @Test
     void testReaderMetadataChangeWhenOneTopicChanges() throws Exception {
         try (DynamicKafkaSourceReader<Integer> reader =
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index 5ad87ffc..60bce02c 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -54,7 +54,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -68,6 +67,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
@@ -82,8 +82,6 @@ import static 
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderM
 import static 
org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_PARTITIONS;
 import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
 
 /** Unit tests for {@link KafkaSourceReader}. */
 public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSplit> {
@@ -508,7 +506,12 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
 
     @Test
     public void testThatReaderDoesNotCallRackIdSupplierOnInit() throws 
Exception {
-        SerializableSupplier<String> rackIdSupplier = 
Mockito.mock(SerializableSupplier.class);
+        AtomicBoolean called = new AtomicBoolean();
+        SerializableSupplier<String> rackIdSupplier =
+                () -> {
+                    called.set(true);
+                    return "dummy";
+                };
 
         try (KafkaSourceReader<Integer> reader =
                 (KafkaSourceReader<Integer>)
@@ -521,13 +524,17 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
             // Do nothing here
         }
 
-        verify(rackIdSupplier, never()).get();
+        assertThat(called).isFalse();
     }
 
     @Test
     public void testThatReaderDoesCallRackIdSupplierOnSplitAssignment() throws 
Exception {
-        SerializableSupplier<String> rackIdSupplier = 
Mockito.mock(SerializableSupplier.class);
-        Mockito.when(rackIdSupplier.get()).thenReturn("use1-az1");
+        AtomicBoolean called = new AtomicBoolean();
+        SerializableSupplier<String> rackIdSupplier =
+                () -> {
+                    called.set(true);
+                    return "use1-az1";
+                };
 
         try (KafkaSourceReader<Integer> reader =
                 (KafkaSourceReader<Integer>)
@@ -542,7 +549,7 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
                             new KafkaPartitionSplit(new TopicPartition(TOPIC, 
1), 1L)));
         }
 
-        verify(rackIdSupplier).get();
+        assertThat(called).isTrue();
     }
 
     // ------------------------------------------

Reply via email to