dannycranmer commented on a change in pull request #17189: URL: https://github.com/apache/flink/pull/17189#discussion_r704784987
########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ########## @@ -819,7 +819,7 @@ public void shutdownFetcher() { LOG.warn("Encountered exception closing record publisher factory", e); } } finally { - shardConsumersExecutor.shutdownNow(); + shardConsumersExecutor.shutdown(); Review comment: Are you sure this is correct? `shutdown()` [will not interrupt active shard consumers](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()). How will the running threads get interrupted? ########## File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java ########## @@ -35,36 +40,14 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; -import org.apache.flink.streaming.connectors.kinesis.testutils.AlwaysThrowsDeserializationSchema; -import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory; -import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; -import org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext; -import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; -import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher; -import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcherForShardConsumerException; +import org.apache.flink.streaming.connectors.kinesis.testutils.*; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; - -import com.amazonaws.services.kinesis.model.HashKeyRange; -import com.amazonaws.services.kinesis.model.SequenceNumberRange; -import com.amazonaws.services.kinesis.model.Shard; -import org.apache.commons.lang3.mutable.MutableBoolean; -import org.apache.commons.lang3.mutable.MutableLong; import org.junit.Assert; import org.junit.Test; import org.powermock.reflect.Whitebox; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Random; -import java.util.Set; -import java.util.UUID; +import java.util.*; Review comment: We should [not use wildcard imports](https://flink.apache.org/contributing/code-style-and-quality-formatting.html#imports) ########## File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java ########## @@ -73,14 +56,8 @@ import static java.util.Collections.singletonList; import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; Review comment: We should [not use wildcard imports](https://flink.apache.org/contributing/code-style-and-quality-formatting.html#imports) ########## File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java ########## @@ -35,36 +40,14 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; -import org.apache.flink.streaming.connectors.kinesis.testutils.AlwaysThrowsDeserializationSchema; -import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory; -import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; -import org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext; -import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; -import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher; -import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcherForShardConsumerException; +import org.apache.flink.streaming.connectors.kinesis.testutils.*; Review comment: We should [not use wildcard imports](https://flink.apache.org/contributing/code-style-and-quality-formatting.html#imports) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org