dannycranmer commented on a change in pull request #13102:
URL: https://github.com/apache/flink/pull/13102#discussion_r483515110



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -30,15 +34,28 @@
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
 
+       /** An Asynchronous client used to communicate with AWS services. */
        private final KinesisAsyncClient kinesisAsyncClient;
 
        /**
-        * Create a new KinesisProxyV2 based on the supplied configuration 
properties.
+        * Create a new KinesisProxyV2 using the provided Async Client.
         *
         * @param kinesisAsyncClient the kinesis async client used to 
communicate with Kinesis
         */
        public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
                this.kinesisAsyncClient = 
Preconditions.checkNotNull(kinesisAsyncClient);
        }
 
+       @Override
+       public CompletableFuture<Void> subscribeToShard(
+                       final SubscribeToShardRequest request,
+                       final SubscribeToShardResponseHandler responseHandler) {
+               return kinesisAsyncClient.subscribeToShard(request, 
responseHandler);
+       }
+
+       @Override
+       public void close() {
+               kinesisAsyncClient.close();

Review comment:
       @tzulitai I later realised that I need to close the Async HTTP Client 
used by this client explicitly. This has been done in the next PR 
(registration). If you want me to pull that back into this PR let me know. The 
result of not closing the HTTP Client is that the thread pool is not shutdown 
when the job is cancelled/errors.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to