xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463385042



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+       /** Random seed used to calculate backoff jitter for Kinesis 
operations. */
+       private static final Random seed = new Random();
 
        private final KinesisAsyncClient kinesisAsyncClient;
 
+       private final FanOutRecordPublisherConfiguration 
fanOutRecordPublisherConfiguration;
+
        /**
         * Create a new KinesisProxyV2 based on the supplied configuration 
properties.
         *
-        * @param kinesisAsyncClient the kinesis async client used to 
communicate with Kinesis
+        * @param configProps configuration properties containing AWS 
credential and AWS region info
+        */
+       public KinesisProxyV2(final Properties configProps, List<String> 
streams) {
+               this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+               this.fanOutRecordPublisherConfiguration = new 
FanOutRecordPublisherConfiguration(configProps, streams);
+       }
+
+       /**
+        * Creates a Kinesis proxy V2.
+        *
+        * @param configProps configuration properties
+        * @param streams list of kinesis stream names
+        * @return the created kinesis proxy v2
+        */
+       public static KinesisProxyV2Interface create(Properties configProps, 
List<String> streams) {
+               return new KinesisProxyV2(configProps, streams);
+       }
+
+       /**
+        * Create the Kinesis client, using the provided configuration 
properties.
+        * Derived classes can override this method to customize the client 
configuration.
+        *
+        * @param configProps the properties map used to create the Kinesis 
Client
+        * @return a Kinesis Client
+        */
+       protected KinesisAsyncClient createKinesisAsyncClient(final Properties 
configProps) {
+               final ClientConfiguration config = new 
ClientConfigurationFactory().getConfig();
+               return AwsV2Util.createKinesisAsyncClient(configProps, config);
+       }
+
+       /**
+        * {@inheritDoc}
         */
-       public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-               this.kinesisAsyncClient = kinesisAsyncClient;
+       @Override
+       public Map<String, String> describeStream(List<String> streams) throws 
InterruptedException, ExecutionException {
+               Map<String, String> result = new HashMap<>();
+               for (String stream : streams) {
+                       DescribeStreamRequest describeStreamRequest = 
DescribeStreamRequest
+                               .builder()
+                               .streamName(stream)
+                               .build();
+                       DescribeStreamResponse describeStreamResponse = null;
+
+                       int retryCount = 0;
+                       while (retryCount <= 
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() && 
describeStreamResponse == null) {
+                               try {
+                                       describeStreamResponse = 
kinesisAsyncClient.describeStream(describeStreamRequest).get();
+                               } catch (ExecutionException ex) {
+                                       if 
(AwsV2Util.isRecoverableException(ex)) {
+                                               long backoffMillis = 
fullJitterBackoff(
+                                                       
fanOutRecordPublisherConfiguration.getDescribeStreamBaseBackoffMillis(), 
fanOutRecordPublisherConfiguration.getDescribeStreamMaxBackoffMillis(), 
fanOutRecordPublisherConfiguration.getDescribeStreamExpConstant(), 
retryCount++);
+                                               LOG.warn("Got recoverable 
AmazonServiceException when trying to describe stream " + stream + ". Backing 
off for "
+                                                       + backoffMillis + " 
millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+                                               Thread.sleep(backoffMillis);
+                                       } else {
+                                               throw ex;
+                                       }
+                               }
+                       }
+                       if (describeStreamResponse == null) {
+                               throw new RuntimeException("Retries exceeded 
for describeStream operation - all " + 
fanOutRecordPublisherConfiguration.getDescribeStreamMaxRetries() +
+                                       " retry attempts failed.");
+                       }
+                       result.put(stream, 
describeStreamResponse.streamDescription().streamARN());
+               }
+               return result;
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public List<FanOutStreamInfo> registerStreamConsumer(Map<String, 
String> streamArns) throws InterruptedException, ExecutionException {
+               
Preconditions.checkArgument(fanOutRecordPublisherConfiguration.getConsumerName().isPresent());
+               String consumerName = 
fanOutRecordPublisherConfiguration.getConsumerName().get();
+               List<FanOutStreamInfo> result = new ArrayList<>();
+               for (Map.Entry<String, String> entry : streamArns.entrySet()) {
+                       String stream = entry.getKey();
+                       String streamArn = entry.getValue();
+                       RegisterStreamConsumerRequest 
registerStreamConsumerRequest = RegisterStreamConsumerRequest
+                               .builder()
+                               .consumerName(consumerName)
+                               .streamARN(streamArn)
+                               .build();
+                       FanOutStreamInfo fanOutStreamInfo = null;
+                       int retryCount = 0;
+                       while (retryCount <= 
fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() && 
fanOutStreamInfo == null) {
+                               try {
+                                       RegisterStreamConsumerResponse 
registerStreamConsumerResponse = 
kinesisAsyncClient.registerStreamConsumer(registerStreamConsumerRequest).get();
+                                       fanOutStreamInfo = new 
FanOutStreamInfo(stream, streamArn, consumerName, 
registerStreamConsumerResponse.consumer().consumerARN());
+                               } catch (ExecutionException ex) {
+                                       if (AwsV2Util.isResourceInUse(ex)) {
+                                               fanOutStreamInfo = 
describeStreamConsumer(stream, streamArn, consumerName);
+                                       } else if 
(AwsV2Util.isRecoverableException(ex)) {
+                                               long backoffMillis = 
fullJitterBackoff(
+                                                       
fanOutRecordPublisherConfiguration.getRegisterStreamBaseBackoffMillis(), 
fanOutRecordPublisherConfiguration.getRegisterStreamMaxBackoffMillis(), 
fanOutRecordPublisherConfiguration.getRegisterStreamExpConstant(), 
retryCount++);
+                                               LOG.warn("Got recoverable 
AmazonServiceException when trying to register " + stream + ". Backing off for "
+                                                       + backoffMillis + " 
millis (" + ex.getClass().getName() + ": " + ex.getMessage() + ")");
+                                               Thread.sleep(backoffMillis);
+                                       } else {
+                                               throw ex;
+                                       }
+                               }
+                       }
+
+                       if (fanOutStreamInfo == null) {
+                               throw new RuntimeException("Retries exceeded 
for registerStream operation - all " + 
fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() +
+                                       " retry attempts failed.");
+                       }
+                       result.add(fanOutStreamInfo);
+               }
+               return result;
+       }
+
+       public FanOutStreamInfo describeStreamConsumer(String stream, String 
streamArn, String consumerName) throws InterruptedException, ExecutionException 
 {
+               DescribeStreamConsumerRequest describeStreamConsumerRequest = 
DescribeStreamConsumerRequest
+                       .builder()
+                       .streamARN(streamArn)
+                       .consumerName(consumerName)
+                       .build();
+               FanOutStreamInfo fanOutStreamInfo = null;
+               int retryCount = 0;
+               while (retryCount <= 
fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxRetries() && 
fanOutStreamInfo == null) {
+                       try {
+                               DescribeStreamConsumerResponse 
describeStreamConsumerResponse = 
kinesisAsyncClient.describeStreamConsumer(describeStreamConsumerRequest).get();
+                               fanOutStreamInfo = new FanOutStreamInfo(stream, 
streamArn, consumerName, 
describeStreamConsumerResponse.consumerDescription().consumerARN());
+                       } catch (ExecutionException ex) {
+                               if (AwsV2Util.isRecoverableException(ex)) {
+                                       long backoffMillis = fullJitterBackoff(
+                                               
fanOutRecordPublisherConfiguration.getDescribeStreamConsumerBaseBackoffMillis(),
 
fanOutRecordPublisherConfiguration.getDescribeStreamConsumerMaxBackoffMillis(), 
fanOutRecordPublisherConfiguration.getDescribeStreamConsumerExpConstant(), 
retryCount++);
+                                       LOG.warn("Got recoverable 
AmazonServiceException when trying to describe stream consumer " + stream + ". 
Backing off for "
+                                               + backoffMillis + " millis (" + 
ex.getClass().getName() + ": " + ex.getMessage() + ")");
+                                       Thread.sleep(backoffMillis);
+                               } else {
+                                       throw ex;
+                               }
+                       }
+               }
+
+               if (fanOutStreamInfo == null) {
+                       throw new RuntimeException("Retries exceeded for 
registerStream operation - all " + 
fanOutRecordPublisherConfiguration.getRegisterStreamMaxRetries() +
+                               " retry attempts failed.");
+               }
+               return fanOutStreamInfo;
        }
 
+       /**
+        * {@inheritDoc}
+        */
+       @Override
+       public void deregisterStreamConsumer(List<FanOutStreamInfo> 
fanOutStreamInfos) throws InterruptedException, ExecutionException {

Review comment:
       I changed the behavior and interface.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to