This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit c1442ea8ad2df63361ab9bfbca1d910afc6ebd7d Author: Arvid Heise <ar...@apache.org> AuthorDate: Thu Apr 10 14:52:57 2025 +0200 [FLINK-37644] Remove mockito --- .../c0d94764-76a0-4c50-b617-70b1754c4612 | 29 +++++++++++----------- .../source/reader/DynamicKafkaSourceReader.java | 12 ++++++--- .../reader/DynamicKafkaSourceReaderTest.java | 14 +++-------- .../kafka/source/reader/KafkaSourceReaderTest.java | 23 +++++++++++------ 4 files changed, 40 insertions(+), 38 deletions(-) diff --git a/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 b/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 index 20e55f4c..b96d3c6b 100644 --- a/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 +++ b/flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612 @@ -13,27 +13,26 @@ Method <org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSource.getKa Method <org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager.close()> calls method <org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.close()> in (KafkaClusterMetricGroupManager.java:73) Method <org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager.close(java.lang.String)> calls method <org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.close()> in (KafkaClusterMetricGroupManager.java:62) Method <org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager.register(java.lang.String, org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroup)> checks instanceof <org.apache.flink.runtime.metrics.groups.AbstractMetricGroup> in (KafkaClusterMetricGroupManager.java:42) -Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()> calls constructor <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.<init>(int)> in (DynamicKafkaSourceReader.java:475) -Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()> in (DynamicKafkaSourceReader.java:474) -Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()> in (DynamicKafkaSourceReader.java:485) -Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.getAvailabilityHelper()> has return type <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper> in (DynamicKafkaSourceReader.java:0) -Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.getAvailabilityHelper()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (DynamicKafkaSourceReader.java:0) +Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()> calls constructor <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.<init>(int)> in (DynamicKafkaSourceReader.java:479) +Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()> in (DynamicKafkaSourceReader.java:476) +Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.completeAndResetAvailabilityHelper()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()> in (DynamicKafkaSourceReader.java:489) +Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.getAvailabilityHelperSize()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (DynamicKafkaSourceReader.java:0) Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isActivelyConsumingSplits()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (DynamicKafkaSourceReader.java:0) -Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isAvailable()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()> in (DynamicKafkaSourceReader.java:383) -Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isAvailable()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.resetToUnAvailable()> in (DynamicKafkaSourceReader.java:381) -Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int, java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:496) +Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isAvailable()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.getAvailableFuture()> in (DynamicKafkaSourceReader.java:385) +Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isAvailable()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.resetToUnAvailable()> in (DynamicKafkaSourceReader.java:383) +Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int, java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:500) Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducerPool()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0) Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0) Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getBackchannel()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0) Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getCommittingProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0) -Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:177) -Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:180) -Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:176) -Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:179) -Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:176) +Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:178) +Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:181) +Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:177) +Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:180) +Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:177) Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> has generic parameter type <org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.kafka.sink.KafkaCommittable>>> with type argument depending on <org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in (KafkaSink.java:0) Method <org.apache.flink.connector.kafka.sink.KafkaSink.getKafkaProducerConfig()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSink.java:0) -Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:153) +Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:154) Method <org.apache.flink.connector.kafka.sink.KafkaWriter.getCurrentProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaWriter.java:0) Method <org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl.getProducers()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ProducerPoolImpl.java:0) Method <org.apache.flink.connector.kafka.source.KafkaSource.createReader(org.apache.flink.api.connector.source.SourceReaderContext, java.util.function.Consumer)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0) @@ -51,6 +50,6 @@ Method <org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.getOffs Method <org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow(org.apache.flink.table.data.RowData, org.apache.flink.types.RowKind, [Lorg.apache.flink.table.data.RowData$FieldGetter;)> has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (DynamicKafkaRecordSerializationSchema.java:0) Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldNames(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:520) Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:564) -Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(org.apache.flink.table.connector.sink.DynamicTableSink$Context, org.apache.flink.table.connector.format.EncodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSink.java:401) +Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(org.apache.flink.table.connector.sink.DynamicTableSink$Context, org.apache.flink.table.connector.format.EncodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSink.java:408) Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getFieldGetters(java.util.List, [I)> has return type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (KafkaDynamicSink.java:0) Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context, org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSource.java:574) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java index ca745441..38952519 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java @@ -91,6 +91,7 @@ public class DynamicKafkaSourceReader<T> implements SourceReader<T, DynamicKafka private final List<DynamicKafkaSourceSplit> pendingSplits; private MultipleFuturesAvailabilityHelper availabilityHelper; + private int availabilityHelperSize; private boolean isActivelyConsumingSplits; private boolean isNoMoreSplits; private AtomicBoolean restartingReaders; @@ -110,7 +111,8 @@ public class DynamicKafkaSourceReader<T> implements SourceReader<T, DynamicKafka .addGroup(KafkaClusterMetricGroup.DYNAMIC_KAFKA_SOURCE_METRIC_GROUP); this.kafkaClusterMetricGroupManager = new KafkaClusterMetricGroupManager(); this.pendingSplits = new ArrayList<>(); - this.availabilityHelper = new MultipleFuturesAvailabilityHelper(0); + this.availabilityHelper = + new MultipleFuturesAvailabilityHelper(this.availabilityHelperSize = 0); this.isNoMoreSplits = false; this.isActivelyConsumingSplits = false; this.restartingReaders = new AtomicBoolean(); @@ -472,7 +474,9 @@ public class DynamicKafkaSourceReader<T> implements SourceReader<T, DynamicKafka */ private void completeAndResetAvailabilityHelper() { CompletableFuture<?> cachedPreviousFuture = availabilityHelper.getAvailableFuture(); - availabilityHelper = new MultipleFuturesAvailabilityHelper(clusterReaderMap.size()); + availabilityHelper = + new MultipleFuturesAvailabilityHelper( + this.availabilityHelperSize = clusterReaderMap.size()); syncAvailabilityHelperWithReaders(); // We cannot immediately complete the previous future here. We must complete it only when @@ -538,8 +542,8 @@ public class DynamicKafkaSourceReader<T> implements SourceReader<T, DynamicKafka } @VisibleForTesting - public MultipleFuturesAvailabilityHelper getAvailabilityHelper() { - return availabilityHelper; + public int getAvailabilityHelperSize() { + return availabilityHelperSize; } @VisibleForTesting diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java index 5094e015..feedac8f 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java @@ -39,7 +39,6 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.powermock.reflect.Whitebox; import java.util.ArrayList; import java.util.Collections; @@ -175,7 +174,7 @@ public class DynamicKafkaSourceReaderTest extends SourceReaderTestBase<DynamicKa assertThat(futureAtInit) .as("future is not complete at fresh startup since no readers are created") .isNotDone(); - assertThat(getAvailabilityHelperSize(reader)).isZero(); + assertThat(reader.getAvailabilityHelperSize()).isZero(); reader.start(); MetadataUpdateEvent metadata = @@ -193,7 +192,7 @@ public class DynamicKafkaSourceReaderTest extends SourceReaderTestBase<DynamicKa .as( "New future should have been produced since metadata triggers reader creation") .isNotSameAs(futureAfterSplitAssignment); - assertThat(getAvailabilityHelperSize(reader)).isEqualTo(2); + assertThat(reader.getAvailabilityHelperSize()).isEqualTo(2); // remove cluster 0 KafkaStream kafkaStream = DynamicKafkaSourceTestHelper.getKafkaStream(TOPIC); @@ -204,17 +203,10 @@ public class DynamicKafkaSourceReaderTest extends SourceReaderTestBase<DynamicKa assertThat(futureAfterRemovingCluster0) .as("There should new future since the metadata has changed") .isNotSameAs(futureAfterSplitAssignment); - assertThat(getAvailabilityHelperSize(reader)).isEqualTo(1); + assertThat(reader.getAvailabilityHelperSize()).isEqualTo(1); } } - private int getAvailabilityHelperSize(DynamicKafkaSourceReader<?> reader) { - return ((CompletableFuture<?>[]) - Whitebox.getInternalState( - reader.getAvailabilityHelper(), "futuresToCombine")) - .length; - } - @Test void testReaderMetadataChangeWhenOneTopicChanges() throws Exception { try (DynamicKafkaSourceReader<Integer> reader = diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java index 5ad87ffc..60bce02c 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java @@ -54,7 +54,6 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import java.time.Duration; import java.util.ArrayList; @@ -68,6 +67,7 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Supplier; @@ -82,8 +82,6 @@ import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderM import static org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_PARTITIONS; import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; /** Unit tests for {@link KafkaSourceReader}. */ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSplit> { @@ -508,7 +506,12 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp @Test public void testThatReaderDoesNotCallRackIdSupplierOnInit() throws Exception { - SerializableSupplier<String> rackIdSupplier = Mockito.mock(SerializableSupplier.class); + AtomicBoolean called = new AtomicBoolean(); + SerializableSupplier<String> rackIdSupplier = + () -> { + called.set(true); + return "dummy"; + }; try (KafkaSourceReader<Integer> reader = (KafkaSourceReader<Integer>) @@ -521,13 +524,17 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp // Do nothing here } - verify(rackIdSupplier, never()).get(); + assertThat(called).isFalse(); } @Test public void testThatReaderDoesCallRackIdSupplierOnSplitAssignment() throws Exception { - SerializableSupplier<String> rackIdSupplier = Mockito.mock(SerializableSupplier.class); - Mockito.when(rackIdSupplier.get()).thenReturn("use1-az1"); + AtomicBoolean called = new AtomicBoolean(); + SerializableSupplier<String> rackIdSupplier = + () -> { + called.set(true); + return "use1-az1"; + }; try (KafkaSourceReader<Integer> reader = (KafkaSourceReader<Integer>) @@ -542,7 +549,7 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp new KafkaPartitionSplit(new TopicPartition(TOPIC, 1), 1L))); } - verify(rackIdSupplier).get(); + assertThat(called).isTrue(); } // ------------------------------------------