dannycranmer commented on a change in pull request #17189:
URL: https://github.com/apache/flink/pull/17189#discussion_r704784987



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -819,7 +819,7 @@ public void shutdownFetcher() {
                 LOG.warn("Encountered exception closing record publisher 
factory", e);
             }
         } finally {
-            shardConsumersExecutor.shutdownNow();
+            shardConsumersExecutor.shutdown();

Review comment:
       Are you sure this is correct? `shutdown()` [will not interrupt active 
shard 
consumers](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()).
 How will the running threads get interrupted? 

##########
File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
##########
@@ -35,36 +40,14 @@
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
-import 
org.apache.flink.streaming.connectors.kinesis.testutils.AlwaysThrowsDeserializationSchema;
-import 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
-import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
-import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext;
-import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
-import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
-import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcherForShardConsumerException;
+import org.apache.flink.streaming.connectors.kinesis.testutils.*;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
-
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.SequenceNumberRange;
-import com.amazonaws.services.kinesis.model.Shard;
-import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.commons.lang3.mutable.MutableLong;
 import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;

Review comment:
       We should [not use wildcard 
imports](https://flink.apache.org/contributing/code-style-and-quality-formatting.html#imports)

##########
File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
##########
@@ -73,14 +56,8 @@
 
 import static java.util.Collections.singletonList;
 import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;

Review comment:
       We should [not use wildcard 
imports](https://flink.apache.org/contributing/code-style-and-quality-formatting.html#imports)

##########
File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
##########
@@ -35,36 +40,14 @@
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
-import 
org.apache.flink.streaming.connectors.kinesis.testutils.AlwaysThrowsDeserializationSchema;
-import 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
-import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
-import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext;
-import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
-import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
-import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcherForShardConsumerException;
+import org.apache.flink.streaming.connectors.kinesis.testutils.*;

Review comment:
       We should [not use wildcard 
imports](https://flink.apache.org/contributing/code-style-and-quality-formatting.html#imports)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to