xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463453684



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
  */
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
+       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+       /** Random seed used to calculate backoff jitter for Kinesis 
operations. */
+       private static final Random seed = new Random();
 
        private final KinesisAsyncClient kinesisAsyncClient;
 
+       private final FanOutRecordPublisherConfiguration 
fanOutRecordPublisherConfiguration;
+
        /**
         * Create a new KinesisProxyV2 based on the supplied configuration 
properties.
         *
-        * @param kinesisAsyncClient the kinesis async client used to 
communicate with Kinesis
+        * @param configProps configuration properties containing AWS 
credential and AWS region info
+        */
+       public KinesisProxyV2(final Properties configProps, List<String> 
streams) {
+               this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+               this.fanOutRecordPublisherConfiguration = new 
FanOutRecordPublisherConfiguration(configProps, streams);
+       }
+
+       /**
+        * Creates a Kinesis proxy V2.
+        *
+        * @param configProps configuration properties
+        * @param streams list of kinesis stream names
+        * @return the created kinesis proxy v2
+        */
+       public static KinesisProxyV2Interface create(Properties configProps, 
List<String> streams) {
+               return new KinesisProxyV2(configProps, streams);
+       }
+
+       /**
+        * Create the Kinesis client, using the provided configuration 
properties.
+        * Derived classes can override this method to customize the client 
configuration.
+        *
+        * @param configProps the properties map used to create the Kinesis 
Client
+        * @return a Kinesis Client
+        */
+       protected KinesisAsyncClient createKinesisAsyncClient(final Properties 
configProps) {
+               final ClientConfiguration config = new 
ClientConfigurationFactory().getConfig();
+               return AwsV2Util.createKinesisAsyncClient(configProps, config);
+       }
+
+       /**
+        * {@inheritDoc}
         */
-       public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
-               this.kinesisAsyncClient = kinesisAsyncClient;
+       @Override
+       public Map<String, String> describeStream(List<String> streams) throws 
InterruptedException, ExecutionException {

Review comment:
       I just split it and made the loop external.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to