dannycranmer commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r461532730



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -362,6 +440,10 @@ protected KinesisDataFetcher(List<String> streams,
                this.watermarkTracker = watermarkTracker;
                this.kinesisProxyFactory = checkNotNull(kinesisProxyFactory);
                this.kinesis = kinesisProxyFactory.create(configProps);
+               this.kinesisProxyV2Factory = 
checkNotNull(kinesisProxyV2Factory);
+               if (shouldRegisterConsumerEagerly()) {

Review comment:
       @xiaolong-sn `EAGER` should be done in the `FlinkKinesisConsumer` 
constructor, `LAZY` should be done here

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -400,20 +482,51 @@ private RecordEmitter createRecordEmitter(Properties 
configProps) {
                SequenceNumber lastSequenceNum,
                MetricGroup metricGroup,
                KinesisDeserializationSchema<T> shardDeserializer) {
+               return createShardConsumer(
+                       subscribedShardStateIndex,
+                       subscribedShard,
+                       lastSequenceNum,
+                       metricGroup,
+                       shardDeserializer,
+                       null
+               );
+       }
+
+       /**
+        * Create a new shard consumer.
+        * Override this method to customize shard consumer behavior in 
subclasses.
+        * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
+        * @param subscribedShard the shard this consumer is subscribed to
+        * @param lastSequenceNum the sequence number in the shard to start 
consuming
+        * @param metricGroup the metric group to report metrics to
+        * @param streamInfoList the stream info used for enhanced fan-out to 
consume from
+        * @return shard consumer
+        */
+       protected ShardConsumer<T> createShardConsumer(
+               Integer subscribedShardStateIndex,
+               StreamShardHandle subscribedShard,
+               SequenceNumber lastSequenceNum,
+               MetricGroup metricGroup,
+               KinesisDeserializationSchema<T> shardDeserializer,
+               @Nullable List<FanOutStreamInfo> streamInfoList) {

Review comment:
       @xiaolong-sn why do we need to pass all the stream info? A 
`ShardConsumer` is only concerned about a single stream

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisherConfiguration.java
##########
@@ -134,6 +134,46 @@
         */
        private final int listStreamConsumersMaxRetries;
 
+       /**
+        * Max retries for the describe stream operation.
+        */
+       private final int describeStreamMaxRetries;

Review comment:
       @xiaolong-sn it would make sense to move the `DescribeStream` bits to 
you config PR since that has not been reviewed yet

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when 
de-/registering streams.
+ */
+public class FanOutStreamInfo {
+       /** Kinesis stream name. */
+       private final String stream;
+
+       /** Kinesis stream arn. */
+       private final String streamArn;
+
+       /** Registered consumer name for the related stream. */
+       private final String consumerName;
+
+       /** Registered consumer arn for the related stream. */
+       private final String consumerArn;
+
+       /**
+        * Return the Kinesis stream name.
+        */
+       public String getStream() {
+               return stream;
+       }
+
+       /**
+        * Return the Kinesis stream arn.
+        */
+       public String getStreamArn() {
+               return streamArn;
+       }
+
+       /**
+        * Return the Kinesis consumer name for an enhanced fan-out consumer.
+        */
+       public String getConsumerName() {
+               return consumerName;
+       }
+
+       /**
+        * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+        */
+       public String getConsumerArn() {
+               return consumerArn;
+       }
+
+       /**
+        * Public constructor for fan out stream info.
+        */
+       public FanOutStreamInfo(String stream, String streamArn, String 
consumerName, String consumerArn) {

Review comment:
       @xiaolong-sn nit: constructors should go above methods

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+       /** Random seed used to calculate backoff jitter for Kinesis 
operations. */
+       private static final Random seed = new Random();
 
        private final KinesisAsyncClient kinesisAsyncClient;
 
+       private final FanOutRecordPublisherConfiguration 
fanOutRecordPublisherConfiguration;
+
        /**
         * Create a new KinesisProxyV2 based on the supplied configuration 
properties.
         *
-        * @param kinesisAsyncClient the kinesis async client used to 
communicate with Kinesis
+        * @param configProps configuration properties containing AWS 
credential and AWS region info
+        */
+       public KinesisProxyV2(final Properties configProps, List<String> 
streams) {
+               this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+               this.fanOutRecordPublisherConfiguration = new 
FanOutRecordPublisherConfiguration(configProps, streams);
+       }
+
+       /**
+        * Creates a Kinesis proxy V2.
+        *
+        * @param configProps configuration properties
+        * @param streams list of kinesis stream names
+        * @return the created kinesis proxy v2
+        */
+       public static KinesisProxyV2Interface create(Properties configProps, 
List<String> streams) {
+               return new KinesisProxyV2(configProps, streams);
+       }
+
+       /**
+        * Create the Kinesis client, using the provided configuration 
properties.
+        * Derived classes can override this method to customize the client 
configuration.
+        *
+        * @param configProps the properties map used to create the Kinesis 
Client
+        * @return a Kinesis Client
+        */
+       protected KinesisAsyncClient createKinesisAsyncClient(final Properties 
configProps) {
+               final ClientConfiguration config = new 
ClientConfigurationFactory().getConfig();
+               return AwsV2Util.createKinesisAsyncClient(configProps, config);
+       }
+
+       /**
+        * {@inheritDoc}
         */
-       public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {

Review comment:
       @xiaolong-sn I had changed this class to not construct it dependencies 
inline with Dependency Inversion (SOLID) principle 
(https://github.com/apache/flink/pull/13005/commits/b7e8c0631ff1484905ff1ecf8f89a8b81036bc1a)
   

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+       /** Random seed used to calculate backoff jitter for Kinesis 
operations. */
+       private static final Random seed = new Random();
 
        private final KinesisAsyncClient kinesisAsyncClient;
 
+       private final FanOutRecordPublisherConfiguration 
fanOutRecordPublisherConfiguration;
+
        /**
         * Create a new KinesisProxyV2 based on the supplied configuration 
properties.
         *
-        * @param kinesisAsyncClient the kinesis async client used to 
communicate with Kinesis
+        * @param configProps configuration properties containing AWS 
credential and AWS region info
+        */
+       public KinesisProxyV2(final Properties configProps, List<String> 
streams) {
+               this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+               this.fanOutRecordPublisherConfiguration = new 
FanOutRecordPublisherConfiguration(configProps, streams);
+       }
+
+       /**
+        * Creates a Kinesis proxy V2.
+        *
+        * @param configProps configuration properties
+        * @param streams list of kinesis stream names
+        * @return the created kinesis proxy v2
+        */
+       public static KinesisProxyV2Interface create(Properties configProps, 
List<String> streams) {
+               return new KinesisProxyV2(configProps, streams);
+       }
+
+       /**
+        * Create the Kinesis client, using the provided configuration 
properties.
+        * Derived classes can override this method to customize the client 
configuration.
+        *
+        * @param configProps the properties map used to create the Kinesis 
Client
+        * @return a Kinesis Client
+        */
+       protected KinesisAsyncClient createKinesisAsyncClient(final Properties 
configProps) {
+               final ClientConfiguration config = new 
ClientConfigurationFactory().getConfig();
+               return AwsV2Util.createKinesisAsyncClient(configProps, config);
+       }
+
+       /**
+        * {@inheritDoc}
         */
-       public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-               this.kinesisAsyncClient = kinesisAsyncClient;
+       @Override
+       public Map<String, String> describeStream(List<String> streams) throws 
InterruptedException, ExecutionException {
+               Map<String, String> result = new HashMap<>();
+               for (String stream : streams) {
+                       DescribeStreamRequest describeStreamRequest = 
DescribeStreamRequest
+                               .builder()
+                               .streamName(stream)
+                               .build();
+                       DescribeStreamResponse describeStreamResponse = null;
+
+                       int retryCount = 0;
+                       while (retryCount <= 
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && 
describeStreamResponse == null) {
+                               try {
+                                       describeStreamResponse = 
kinesisAsyncClient.describeStream(describeStreamRequest).get();
+                               } catch (ExecutionException ex) {
+                                       if 
(AwsV2Util.isRecoverableException(ex)) {
+                                               long backoffMillis = 
fullJitterBackoff(
+                                                       
fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), 
fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), 
fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), 
retryCount++);
+                                               LOG.warn("Got recoverable 
AmazonServiceException when trying to describe stream " + stream + ". Backing 
off for "
+                                                       + backoffMillis + " 
millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+                                               Thread.sleep(backoffMillis);
+                                       } else {
+                                               throw ex;
+                                       }
+                               }
+                       }
+                       if (describeStreamResponse == null) {
+                               throw new RuntimeException("Retries exceeded 
for describeStream operation - all " + 
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+                                       " retry attempts failed.");
+                       }
+                       result.put(stream, 
describeStreamResponse.streamDescription().streamARN());
+               }
+               return result;
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public List<FanOutStreamInfo> registerStreamConsumer(Map<String, 
String> streamArns) throws InterruptedException, ExecutionException {
+               
Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+               String consumerName = 
fanOutRecordPublisherConfiguration.getConsumerName().get();
+               List<FanOutStreamInfo> result = new ArrayList<>();
+               for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+                       String stream = entry.getKey();
+                       String streamArn = entry.getValue();
+                       RegisterStreamConsumerRequest 
registerStreamConsumerRequest = RegisterStreamConsumerRequest
+                               .builder()
+                               .consumerName(consumerName)
+                               .streamARN(streamArn)
+                               .build();
+                       FanOutStreamInfo fanOutStreamInfo = null;
+                       int retryCount = 0;
+                       while (retryCount <= 
fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() && 
fanOutStreamInfo == null) {
+                               try {
+                                       RegisterStreamConsumerResponse 
registerStreamConsumerResponse = 
kinesisAsyncClient.registerStreamConsumer(registerStreamConsumerRequest).get();
+                                       fanOutStreamInfo = new 
FanOutStreamInfo(stream, streamArn, consumerName, 
registerStreamConsumerResponse.consumer().consumerARN());
+                               } catch (ExecutionException ex) {
+                                       if (AwsV2Util.isResourceInUse(ex)) {
+                                               fanOutStreamInfo = 
describeStreamConsumer(stream, streamArn, consumerName);
+                                       } else if 
(AwsV2Util.isRecoverableException(ex)) {
+                                               long backoffMillis = 
fullJitterBackoff(
+                                                       
fanOutRecordPublisherConfiguration.getRegisterStreamBaseBackoffMillis(), 
fanOutRecordPublisherConfiguration.getRegisterStreamMaxBackoffMillis(), 
fanOutRecordPublisherConfiguration.getRegisterStreamExpConstant(), 
retryCount++);
+                                               LOG.warn("Got recoverable 
AmazonServiceException when trying to register " + stream + ". Backing off for "
+                                                       + backoffMillis + " 
millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+                                               Thread.sleep(backoffMillis);
+                                       } else {
+                                               throw ex;
+                                       }
+                               }
+                       }
+
+                       if (fanOutStreamInfo == null) {
+                               throw new RuntimeException("Retries exceeded 
for registerStream operation - all " + 
fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() +
+                                       " retry attempts failed.");
+                       }
+                       result.add(fanOutStreamInfo);
+               }
+               return result;
+       }
+
+       public FanOutStreamInfo describeStreamConsumer(String stream, String 
streamArn, String consumerName) throws InterruptedException, ExecutionException 
 {
+               DescribeStreamConsumerRequest describeStreamConsumerRequest = 
DescribeStreamConsumerRequest
+                       .builder()
+                       .streamARN(streamArn)
+                       .consumerName(consumerName)
+                       .build();
+               FanOutStreamInfo fanOutStreamInfo = null;
+               int retryCount = 0;
+               while (retryCount <= 
fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxRetries() && 
fanOutStreamInfo == null) {
+                       try {
+                               DescribeStreamConsumerResponse 
describeStreamConsumerResponse = 
kinesisAsyncClient.describeStreamConsumer(describeStreamConsumerRequest).get();
+                               fanOutStreamInfo = new FanOutStreamInfo(stream, 
streamArn, consumerName, 
describeStreamConsumerResponse.consumerDescription().consumerARN());
+                       } catch (ExecutionException ex) {
+                               if (AwsV2Util.isRecoverableException(ex)) {
+                                       long backoffMillis = fullJitterBackoff(
+                                               
fanOutRecordPublisherConfiguration.getDescribeStreamConsumerBaseBackoffMillis(),
 
fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxBackoffMillis(), 
fanOutRecordPublisherConfiguration.getDescribeStreamConsumerExpConstant(), 
retryCount++);
+                                       LOG.warn("Got recoverable 
AmazonServiceException when trying to describe stream consumer " + stream + ". 
Backing off for "
+                                               + backoffMillis + " millis (" + 
ex.getClass().getName() + ": " + ex.getMessage() + ")");
+                                       Thread.sleep(backoffMillis);
+                               } else {
+                                       throw ex;
+                               }
+                       }
+               }
+
+               if (fanOutStreamInfo == null) {
+                       throw new RuntimeException("Retries exceeded for 
registerStream operation - all " + 
fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() +
+                               " retry attempts failed.");
+               }
+               return fanOutStreamInfo;
        }
 
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public void deregisterStreamConsumer(List<FanOutStreamInfo> 
fanOutStreamInfos) throws InterruptedException, ExecutionException {

Review comment:
       @xiaolong-sn again please make this singular (single deregistration at a 
time)
   
   Also this needs to open with describe and stagger too as per the FLIP flow 
chart to deal with competing requests

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -400,20 +482,51 @@ private RecordEmitter createRecordEmitter(Properties 
configProps) {
                SequenceNumber lastSequenceNum,
                MetricGroup metricGroup,
                KinesisDeserializationSchema<T> shardDeserializer) {
+               return createShardConsumer(
+                       subscribedShardStateIndex,
+                       subscribedShard,
+                       lastSequenceNum,
+                       metricGroup,
+                       shardDeserializer,
+                       null
+               );
+       }
+
+       /**
+        * Create a new shard consumer.
+        * Override this method to customize shard consumer behavior in 
subclasses.
+        * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
+        * @param subscribedShard the shard this consumer is subscribed to
+        * @param lastSequenceNum the sequence number in the shard to start 
consuming
+        * @param metricGroup the metric group to report metrics to
+        * @param streamInfoList the stream info used for enhanced fan-out to 
consume from
+        * @return shard consumer
+        */
+       protected ShardConsumer<T> createShardConsumer(
+               Integer subscribedShardStateIndex,
+               StreamShardHandle subscribedShard,
+               SequenceNumber lastSequenceNum,
+               MetricGroup metricGroup,
+               KinesisDeserializationSchema<T> shardDeserializer,
+               @Nullable List<FanOutStreamInfo> streamInfoList) {
 
                final KinesisProxyInterface kinesis = 
kinesisProxyFactory.create(configProps);
 
                final RecordPublisher recordPublisher = new 
PollingRecordPublisherFactory()
                        .create(configProps, metricGroup, subscribedShard, 
kinesis);
 
-               return new ShardConsumer<>(
+               return new ShardConsumer<T>(
                        this,
                        recordPublisher,
                        subscribedShardStateIndex,
                        subscribedShard,
                        lastSequenceNum,
                        new ShardConsumerMetricsReporter(metricGroup),
-                       shardDeserializer);
+                       shardDeserializer,
+                       configProps,

Review comment:
       @xiaolong-sn all these additional properties should not be required once 
you move LAZY. Remember we have a single stream consumer per stream, not shard. 
So we should perform lazy in the `KinesisDataFetcher` (once per task manager)

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+       /** Random seed used to calculate backoff jitter for Kinesis 
operations. */
+       private static final Random seed = new Random();
 
        private final KinesisAsyncClient kinesisAsyncClient;
 
+       private final FanOutRecordPublisherConfiguration 
fanOutRecordPublisherConfiguration;
+
        /**
         * Create a new KinesisProxyV2 based on the supplied configuration 
properties.
         *
-        * @param kinesisAsyncClient the kinesis async client used to 
communicate with Kinesis
+        * @param configProps configuration properties containing AWS 
credential and AWS region info
+        */
+       public KinesisProxyV2(final Properties configProps, List<String> 
streams) {
+               this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+               this.fanOutRecordPublisherConfiguration = new 
FanOutRecordPublisherConfiguration(configProps, streams);
+       }
+
+       /**
+        * Creates a Kinesis proxy V2.
+        *
+        * @param configProps configuration properties
+        * @param streams list of kinesis stream names
+        * @return the created kinesis proxy v2
+        */
+       public static KinesisProxyV2Interface create(Properties configProps, 
List<String> streams) {
+               return new KinesisProxyV2(configProps, streams);
+       }
+
+       /**
+        * Create the Kinesis client, using the provided configuration 
properties.
+        * Derived classes can override this method to customize the client 
configuration.
+        *
+        * @param configProps the properties map used to create the Kinesis 
Client
+        * @return a Kinesis Client
+        */
+       protected KinesisAsyncClient createKinesisAsyncClient(final Properties 
configProps) {
+               final ClientConfiguration config = new 
ClientConfigurationFactory().getConfig();
+               return AwsV2Util.createKinesisAsyncClient(configProps, config);
+       }
+
+       /**
+        * {@inheritDoc}
         */
-       public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-               this.kinesisAsyncClient = kinesisAsyncClient;
+       @Override
+       public Map<String, String> describeStream(List<String> streams) throws 
InterruptedException, ExecutionException {
+               Map<String, String> result = new HashMap<>();
+               for (String stream : streams) {
+                       DescribeStreamRequest describeStreamRequest = 
DescribeStreamRequest
+                               .builder()
+                               .streamName(stream)
+                               .build();
+                       DescribeStreamResponse describeStreamResponse = null;
+
+                       int retryCount = 0;
+                       while (retryCount <= 
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && 
describeStreamResponse == null) {
+                               try {
+                                       describeStreamResponse = 
kinesisAsyncClient.describeStream(describeStreamRequest).get();
+                               } catch (ExecutionException ex) {
+                                       if 
(AwsV2Util.isRecoverableException(ex)) {
+                                               long backoffMillis = 
fullJitterBackoff(
+                                                       
fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), 
fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), 
fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), 
retryCount++);
+                                               LOG.warn("Got recoverable 
AmazonServiceException when trying to describe stream " + stream + ". Backing 
off for "
+                                                       + backoffMillis + " 
millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+                                               Thread.sleep(backoffMillis);
+                                       } else {
+                                               throw ex;
+                                       }
+                               }
+                       }
+                       if (describeStreamResponse == null) {
+                               throw new RuntimeException("Retries exceeded 
for describeStream operation - all " + 
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+                                       " retry attempts failed.");
+                       }
+                       result.put(stream, 
describeStreamResponse.streamDescription().streamARN());
+               }
+               return result;
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public List<FanOutStreamInfo> registerStreamConsumer(Map<String, 
String> streamArns) throws InterruptedException, ExecutionException {

Review comment:
       @xiaolong-sn again I think this should accept a single 
streamArn/consumerName and loop externally

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when 
de-/registering streams.
+ */
+public class FanOutStreamInfo {
+       /** Kinesis stream name. */
+       private final String stream;
+
+       /** Kinesis stream arn. */
+       private final String streamArn;
+
+       /** Registered consumer name for the related stream. */
+       private final String consumerName;
+
+       /** Registered consumer arn for the related stream. */
+       private final String consumerArn;
+
+       /**
+        * Return the Kinesis stream name.
+        */
+       public String getStream() {
+               return stream;
+       }
+
+       /**
+        * Return the Kinesis stream arn.
+        */
+       public String getStreamArn() {
+               return streamArn;
+       }
+
+       /**
+        * Return the Kinesis consumer name for an enhanced fan-out consumer.
+        */
+       public String getConsumerName() {
+               return consumerName;
+       }
+
+       /**
+        * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+        */
+       public String getConsumerArn() {
+               return consumerArn;
+       }
+
+       /**
+        * Public constructor for fan out stream info.
+        */
+       public FanOutStreamInfo(String stream, String streamArn, String 
consumerName, String consumerArn) {
+               this.stream = stream;
+               this.streamArn = streamArn;
+               this.consumerName = consumerName;
+               this.consumerArn = consumerArn;
+       }
+
+       @Override
+       public boolean equals(Object o) {

Review comment:
       @xiaolong-sn do we need equals/hashCode for this object?

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -147,9 +228,31 @@ public void run() {
                        }
                } catch (Throwable t) {
                        fetcherRef.stopWithError(t);
+               } finally {
+                       if (this.streamInfoList != null) {
+                               try {
+                                       deregisterStreamConsumer();
+                               } catch (Throwable t) {
+                                       fetcherRef.stopWithError(t);
+                               }
+                       }
                }
        }
 
+       private void deregisterStreamConsumer() throws ExecutionException, 
InterruptedException {

Review comment:
       @xiaolong-sn this needs to move to `KinesisDataFetcher::shutdownFetcher` 
   
   

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -22,21 +22,29 @@
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

Review comment:
       @xiaolong-sn I was not expecting any changes to `ShardConsumer`, once 
you move `LAZY` to `KinesisDataFetcher` this should be unchanged

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when 
de-/registering streams.
+ */
+public class FanOutStreamInfo {
+       /** Kinesis stream name. */
+       private final String stream;
+
+       /** Kinesis stream arn. */
+       private final String streamArn;
+
+       /** Registered consumer name for the related stream. */
+       private final String consumerName;

Review comment:
       @xiaolong-sn Immutable, nice +1

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
##########
@@ -248,4 +254,29 @@ static AwsCredentialsProvider 
getWebIdentityTokenFileCredentialsProvider(
        public static Region getRegion(final Properties configProps) {
                return 
Region.of(configProps.getProperty(AWSConfigConstants.AWS_REGION));
        }
+
+       /**
+        * Whether or not an exception is recoverable.
+        */
+       public static boolean isRecoverableException(ExecutionException e) {
+               if (!(e.getCause() instanceof SdkException)) {
+                       return false;
+               }
+               SdkException ase = (SdkException) e.getCause();
+               return ase instanceof LimitExceededException || ase instanceof 
ProvisionedThroughputExceededException || ase instanceof ResourceInUseException;

Review comment:
       @xiaolong-sn `ase instanceof ResourceInUseException` can be replaced 
with `isResourceInUse(ase)`

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+       /** Random seed used to calculate backoff jitter for Kinesis 
operations. */
+       private static final Random seed = new Random();
 
        private final KinesisAsyncClient kinesisAsyncClient;
 
+       private final FanOutRecordPublisherConfiguration 
fanOutRecordPublisherConfiguration;
+
        /**
         * Create a new KinesisProxyV2 based on the supplied configuration 
properties.
         *
-        * @param kinesisAsyncClient the kinesis async client used to 
communicate with Kinesis
+        * @param configProps configuration properties containing AWS 
credential and AWS region info
+        */
+       public KinesisProxyV2(final Properties configProps, List<String> 
streams) {
+               this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+               this.fanOutRecordPublisherConfiguration = new 
FanOutRecordPublisherConfiguration(configProps, streams);
+       }
+
+       /**
+        * Creates a Kinesis proxy V2.
+        *
+        * @param configProps configuration properties
+        * @param streams list of kinesis stream names
+        * @return the created kinesis proxy v2
+        */
+       public static KinesisProxyV2Interface create(Properties configProps, 
List<String> streams) {
+               return new KinesisProxyV2(configProps, streams);
+       }
+
+       /**
+        * Create the Kinesis client, using the provided configuration 
properties.
+        * Derived classes can override this method to customize the client 
configuration.
+        *
+        * @param configProps the properties map used to create the Kinesis 
Client
+        * @return a Kinesis Client
+        */
+       protected KinesisAsyncClient createKinesisAsyncClient(final Properties 
configProps) {
+               final ClientConfiguration config = new 
ClientConfigurationFactory().getConfig();
+               return AwsV2Util.createKinesisAsyncClient(configProps, config);
+       }
+
+       /**
+        * {@inheritDoc}
         */
-       public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-               this.kinesisAsyncClient = kinesisAsyncClient;
+       @Override
+       public Map<String, String> describeStream(List<String> streams) throws 
InterruptedException, ExecutionException {

Review comment:
       @xiaolong-sn I feel like we are throwing a lot of potentially useful 
information away here, why not return `DescribeStreamResponse` instead and keep 
it generic? The KinesisProxyV1 returns SDK objects 

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when 
de-/registering streams.
+ */
+public class FanOutStreamInfo {

Review comment:
       @xiaolong-sn This should be renamed to `FanOutStreamConsumerInfo` please

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutStreamInfo.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import java.util.Objects;
+
+/**
+ * This is a data class which describe all related information when 
de-/registering streams.
+ */
+public class FanOutStreamInfo {
+       /** Kinesis stream name. */
+       private final String stream;
+
+       /** Kinesis stream arn. */
+       private final String streamArn;
+
+       /** Registered consumer name for the related stream. */
+       private final String consumerName;
+
+       /** Registered consumer arn for the related stream. */
+       private final String consumerArn;
+
+       /**
+        * Return the Kinesis stream name.
+        */
+       public String getStream() {
+               return stream;
+       }
+
+       /**
+        * Return the Kinesis stream arn.
+        */
+       public String getStreamArn() {
+               return streamArn;
+       }
+
+       /**
+        * Return the Kinesis consumer name for an enhanced fan-out consumer.
+        */
+       public String getConsumerName() {
+               return consumerName;
+       }
+
+       /**
+        * Return the Kinesis consumer arn for an enhanced fan-out consumer.
+        */
+       public String getConsumerArn() {
+               return consumerArn;
+       }
+
+       /**
+        * Public constructor for fan out stream info.

Review comment:
       @xiaolong-sn no javadoc for parameters

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+       /** Random seed used to calculate backoff jitter for Kinesis 
operations. */
+       private static final Random seed = new Random();
 
        private final KinesisAsyncClient kinesisAsyncClient;
 
+       private final FanOutRecordPublisherConfiguration 
fanOutRecordPublisherConfiguration;
+
        /**
         * Create a new KinesisProxyV2 based on the supplied configuration 
properties.
         *
-        * @param kinesisAsyncClient the kinesis async client used to 
communicate with Kinesis
+        * @param configProps configuration properties containing AWS 
credential and AWS region info
+        */
+       public KinesisProxyV2(final Properties configProps, List<String> 
streams) {
+               this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+               this.fanOutRecordPublisherConfiguration = new 
FanOutRecordPublisherConfiguration(configProps, streams);
+       }
+
+       /**
+        * Creates a Kinesis proxy V2.
+        *
+        * @param configProps configuration properties
+        * @param streams list of kinesis stream names
+        * @return the created kinesis proxy v2
+        */
+       public static KinesisProxyV2Interface create(Properties configProps, 
List<String> streams) {
+               return new KinesisProxyV2(configProps, streams);
+       }
+
+       /**
+        * Create the Kinesis client, using the provided configuration 
properties.
+        * Derived classes can override this method to customize the client 
configuration.
+        *
+        * @param configProps the properties map used to create the Kinesis 
Client
+        * @return a Kinesis Client
+        */
+       protected KinesisAsyncClient createKinesisAsyncClient(final Properties 
configProps) {
+               final ClientConfiguration config = new 
ClientConfigurationFactory().getConfig();
+               return AwsV2Util.createKinesisAsyncClient(configProps, config);
+       }
+
+       /**
+        * {@inheritDoc}
         */
-       public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-               this.kinesisAsyncClient = kinesisAsyncClient;
+       @Override
+       public Map<String, String> describeStream(List<String> streams) throws 
InterruptedException, ExecutionException {
+               Map<String, String> result = new HashMap<>();
+               for (String stream : streams) {
+                       DescribeStreamRequest describeStreamRequest = 
DescribeStreamRequest
+                               .builder()
+                               .streamName(stream)
+                               .build();
+                       DescribeStreamResponse describeStreamResponse = null;
+
+                       int retryCount = 0;
+                       while (retryCount <= 
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && 
describeStreamResponse == null) {
+                               try {
+                                       describeStreamResponse = 
kinesisAsyncClient.describeStream(describeStreamRequest).get();
+                               } catch (ExecutionException ex) {
+                                       if 
(AwsV2Util.isRecoverableException(ex)) {
+                                               long backoffMillis = 
fullJitterBackoff(
+                                                       
fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), 
fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), 
fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), 
retryCount++);
+                                               LOG.warn("Got recoverable 
AmazonServiceException when trying to describe stream " + stream + ". Backing 
off for "
+                                                       + backoffMillis + " 
millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+                                               Thread.sleep(backoffMillis);
+                                       } else {
+                                               throw ex;
+                                       }
+                               }
+                       }
+                       if (describeStreamResponse == null) {
+                               throw new RuntimeException("Retries exceeded 
for describeStream operation - all " + 
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+                                       " retry attempts failed.");
+                       }
+                       result.put(stream, 
describeStreamResponse.streamDescription().streamARN());
+               }
+               return result;
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public List<FanOutStreamInfo> registerStreamConsumer(Map<String, 
String> streamArns) throws InterruptedException, ExecutionException {
+               
Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+               String consumerName = 
fanOutRecordPublisherConfiguration.getConsumerName().get();
+               List<FanOutStreamInfo> result = new ArrayList<>();
+               for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+                       String stream = entry.getKey();
+                       String streamArn = entry.getValue();
+                       RegisterStreamConsumerRequest 
registerStreamConsumerRequest = RegisterStreamConsumerRequest

Review comment:
       @xiaolong-sn what happens if the stream consumer is already registered? 
The flow chart in the FLIP had a call to `ListStreamConsumers` to establish if 
it is already registered. Also in the case of Lazy we are meant to be 
introducing a jitter delay up front. Please refer to the flowcharts in the 
[FLIP](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers)

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+       /** Random seed used to calculate backoff jitter for Kinesis 
operations. */
+       private static final Random seed = new Random();
 
        private final KinesisAsyncClient kinesisAsyncClient;
 
+       private final FanOutRecordPublisherConfiguration 
fanOutRecordPublisherConfiguration;
+
        /**
         * Create a new KinesisProxyV2 based on the supplied configuration 
properties.
         *
-        * @param kinesisAsyncClient the kinesis async client used to 
communicate with Kinesis
+        * @param configProps configuration properties containing AWS 
credential and AWS region info
+        */
+       public KinesisProxyV2(final Properties configProps, List<String> 
streams) {
+               this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+               this.fanOutRecordPublisherConfiguration = new 
FanOutRecordPublisherConfiguration(configProps, streams);
+       }
+
+       /**
+        * Creates a Kinesis proxy V2.
+        *
+        * @param configProps configuration properties
+        * @param streams list of kinesis stream names
+        * @return the created kinesis proxy v2
+        */
+       public static KinesisProxyV2Interface create(Properties configProps, 
List<String> streams) {
+               return new KinesisProxyV2(configProps, streams);
+       }
+
+       /**
+        * Create the Kinesis client, using the provided configuration 
properties.
+        * Derived classes can override this method to customize the client 
configuration.
+        *
+        * @param configProps the properties map used to create the Kinesis 
Client
+        * @return a Kinesis Client
+        */
+       protected KinesisAsyncClient createKinesisAsyncClient(final Properties 
configProps) {
+               final ClientConfiguration config = new 
ClientConfigurationFactory().getConfig();
+               return AwsV2Util.createKinesisAsyncClient(configProps, config);
+       }
+
+       /**
+        * {@inheritDoc}
         */
-       public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-               this.kinesisAsyncClient = kinesisAsyncClient;
+       @Override
+       public Map<String, String> describeStream(List<String> streams) throws 
InterruptedException, ExecutionException {

Review comment:
       @xiaolong-sn this method is called `describeStream` yet it is describing 
streams, please rename or split to two methods:
   - `describeStream`
   - `describeStreams`

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+       /** Random seed used to calculate backoff jitter for Kinesis 
operations. */
+       private static final Random seed = new Random();
 
        private final KinesisAsyncClient kinesisAsyncClient;
 
+       private final FanOutRecordPublisherConfiguration 
fanOutRecordPublisherConfiguration;
+
        /**
         * Create a new KinesisProxyV2 based on the supplied configuration 
properties.
         *
-        * @param kinesisAsyncClient the kinesis async client used to 
communicate with Kinesis
+        * @param configProps configuration properties containing AWS 
credential and AWS region info
+        */
+       public KinesisProxyV2(final Properties configProps, List<String> 
streams) {
+               this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+               this.fanOutRecordPublisherConfiguration = new 
FanOutRecordPublisherConfiguration(configProps, streams);
+       }
+
+       /**
+        * Creates a Kinesis proxy V2.
+        *
+        * @param configProps configuration properties
+        * @param streams list of kinesis stream names
+        * @return the created kinesis proxy v2
+        */
+       public static KinesisProxyV2Interface create(Properties configProps, 
List<String> streams) {
+               return new KinesisProxyV2(configProps, streams);
+       }
+
+       /**
+        * Create the Kinesis client, using the provided configuration 
properties.
+        * Derived classes can override this method to customize the client 
configuration.
+        *
+        * @param configProps the properties map used to create the Kinesis 
Client
+        * @return a Kinesis Client
+        */
+       protected KinesisAsyncClient createKinesisAsyncClient(final Properties 
configProps) {
+               final ClientConfiguration config = new 
ClientConfigurationFactory().getConfig();
+               return AwsV2Util.createKinesisAsyncClient(configProps, config);
+       }
+
+       /**
+        * {@inheritDoc}
         */
-       public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-               this.kinesisAsyncClient = kinesisAsyncClient;
+       @Override
+       public Map<String, String> describeStream(List<String> streams) throws 
InterruptedException, ExecutionException {
+               Map<String, String> result = new HashMap<>();
+               for (String stream : streams) {
+                       DescribeStreamRequest describeStreamRequest = 
DescribeStreamRequest
+                               .builder()
+                               .streamName(stream)
+                               .build();
+                       DescribeStreamResponse describeStreamResponse = null;
+
+                       int retryCount = 0;
+                       while (retryCount <= 
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && 
describeStreamResponse == null) {
+                               try {
+                                       describeStreamResponse = 
kinesisAsyncClient.describeStream(describeStreamRequest).get();
+                               } catch (ExecutionException ex) {
+                                       if 
(AwsV2Util.isRecoverableException(ex)) {
+                                               long backoffMillis = 
fullJitterBackoff(
+                                                       
fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), 
fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), 
fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), 
retryCount++);
+                                               LOG.warn("Got recoverable 
AmazonServiceException when trying to describe stream " + stream + ". Backing 
off for "
+                                                       + backoffMillis + " 
millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+                                               Thread.sleep(backoffMillis);
+                                       } else {
+                                               throw ex;
+                                       }
+                               }
+                       }
+                       if (describeStreamResponse == null) {
+                               throw new RuntimeException("Retries exceeded 
for describeStream operation - all " + 
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+                                       " retry attempts failed.");
+                       }
+                       result.put(stream, 
describeStreamResponse.streamDescription().streamARN());
+               }
+               return result;
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public List<FanOutStreamInfo> registerStreamConsumer(Map<String, 
String> streamArns) throws InterruptedException, ExecutionException {
+               
Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+               String consumerName = 
fanOutRecordPublisherConfiguration.getConsumerName().get();
+               List<FanOutStreamInfo> result = new ArrayList<>();
+               for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+                       String stream = entry.getKey();
+                       String streamArn = entry.getValue();
+                       RegisterStreamConsumerRequest 
registerStreamConsumerRequest = RegisterStreamConsumerRequest

Review comment:
       Ah I see you are using the `describeStreamConsumer` instead of list. 
That is better actually. But I think you should do that first, as per the flow 
chart. Just replace List with Describe. If the consumer is already registered 
it will be 1 call instead of 2 per task. 
   
   Describe has a higher TPS too so that is good!

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+       /** Random seed used to calculate backoff jitter for Kinesis 
operations. */
+       private static final Random seed = new Random();
 
        private final KinesisAsyncClient kinesisAsyncClient;
 
+       private final FanOutRecordPublisherConfiguration 
fanOutRecordPublisherConfiguration;
+
        /**
         * Create a new KinesisProxyV2 based on the supplied configuration 
properties.
         *
-        * @param kinesisAsyncClient the kinesis async client used to 
communicate with Kinesis
+        * @param configProps configuration properties containing AWS 
credential and AWS region info
+        */
+       public KinesisProxyV2(final Properties configProps, List<String> 
streams) {
+               this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+               this.fanOutRecordPublisherConfiguration = new 
FanOutRecordPublisherConfiguration(configProps, streams);
+       }
+
+       /**
+        * Creates a Kinesis proxy V2.
+        *
+        * @param configProps configuration properties
+        * @param streams list of kinesis stream names
+        * @return the created kinesis proxy v2
+        */
+       public static KinesisProxyV2Interface create(Properties configProps, 
List<String> streams) {
+               return new KinesisProxyV2(configProps, streams);
+       }
+
+       /**
+        * Create the Kinesis client, using the provided configuration 
properties.
+        * Derived classes can override this method to customize the client 
configuration.
+        *
+        * @param configProps the properties map used to create the Kinesis 
Client
+        * @return a Kinesis Client
+        */
+       protected KinesisAsyncClient createKinesisAsyncClient(final Properties 
configProps) {
+               final ClientConfiguration config = new 
ClientConfigurationFactory().getConfig();
+               return AwsV2Util.createKinesisAsyncClient(configProps, config);
+       }
+
+       /**
+        * {@inheritDoc}
         */
-       public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-               this.kinesisAsyncClient = kinesisAsyncClient;
+       @Override
+       public Map<String, String> describeStream(List<String> streams) throws 
InterruptedException, ExecutionException {

Review comment:
       @xiaolong-sn considering we cannot batch call I would opt to just expose 
a single `describeStream` method and loop externally to remove the Map and 
additional complexities around that:
   -  `DescribeStreamResponse describeStream(final String streamName)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to