xiaolong-sn commented on a change in pull request #13005:
URL: https://github.com/apache/flink/pull/13005#discussion_r463453684
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -28,16 +51,210 @@
*/
@Internal
public class KinesisProxyV2 implements KinesisProxyV2Interface {
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisProxyV2.class);
+
+ /** Random seed used to calculate backoff jitter for Kinesis
operations. */
+ private static final Random seed = new Random();
private final KinesisAsyncClient kinesisAsyncClient;
+ private final FanOutRecordPublisherConfiguration
fanOutRecordPublisherConfiguration;
+
/**
* Create a new KinesisProxyV2 based on the supplied configuration
properties.
*
- * @param kinesisAsyncClient the kinesis async client used to
communicate with Kinesis
+ * @param configProps configuration properties containing AWS
credential and AWS region info
+ */
+ public KinesisProxyV2(final Properties configProps, List<String>
streams) {
+ this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+ this.fanOutRecordPublisherConfiguration = new
FanOutRecordPublisherConfiguration(configProps, streams);
+ }
+
+ /**
+ * Creates a Kinesis proxy V2.
+ *
+ * @param configProps configuration properties
+ * @param streams list of kinesis stream names
+ * @return the created kinesis proxy v2
+ */
+ public static KinesisProxyV2Interface create(Properties configProps,
List<String> streams) {
+ return new KinesisProxyV2(configProps, streams);
+ }
+
+ /**
+ * Create the Kinesis client, using the provided configuration
properties.
+ * Derived classes can override this method to customize the client
configuration.
+ *
+ * @param configProps the properties map used to create the Kinesis
Client
+ * @return a Kinesis Client
+ */
+ protected KinesisAsyncClient createKinesisAsyncClient(final Properties
configProps) {
+ final ClientConfiguration config = new
ClientConfigurationFactory().getConfig();
+ return AwsV2Util.createKinesisAsyncClient(configProps, config);
+ }
+
+ /**
+ * {@inheritDoc}
*/
- public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
- this.kinesisAsyncClient = kinesisAsyncClient;
+ @Override
+ public Map<String, String> describeStream(List<String> streams) throws
InterruptedException, ExecutionException {
Review comment:
I just split it and made the loop external.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]