darenwkt commented on code in PR #39:
URL:
https://github.com/apache/flink-connector-aws/pull/39#discussion_r1051108678
##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java:
##########
@@ -55,25 +58,39 @@ public static KinesisProxyV2Interface
createKinesisProxyV2(final Properties conf
final AttributeMap.Builder clientConfiguration =
AttributeMap.builder();
populateDefaultValues(clientConfiguration);
- final SdkAsyncHttpClient httpClient =
+ final SdkHttpClient httpClient =
+ AWSGeneralUtil.createSyncHttpClient(
+ convertedProperties.merge(clientConfiguration.build()),
+ ApacheHttpClient.builder());
+
+ final SdkAsyncHttpClient asyncHttpClient =
AWSGeneralUtil.createAsyncHttpClient(
convertedProperties.merge(clientConfiguration.build()),
NettyNioAsyncHttpClient.builder());
+
final FanOutRecordPublisherConfiguration configuration =
new FanOutRecordPublisherConfiguration(configProps,
emptyList());
- Properties asyncClientProperties =
-
KinesisConfigUtil.getV2ConsumerAsyncClientProperties(configProps);
+ Properties clientProperties =
KinesisConfigUtil.getV2ConsumerClientProperties(configProps);
- final KinesisAsyncClient client =
+ final KinesisAsyncClient asyncClient =
AWSAsyncSinkUtil.createAwsAsyncClient(
- asyncClientProperties,
- httpClient,
+ clientProperties,
+ asyncHttpClient,
KinesisAsyncClient.builder(),
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
- return new KinesisProxyV2(client, httpClient, configuration, BACKOFF);
+ final KinesisClient client =
+ AWSAsyncSinkUtil.createAwsSyncClient(
+ clientProperties,
+ httpClient,
+ KinesisClient.builder(),
+
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
+
KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
+
+ return new KinesisProxyV2(
+ asyncClient, client, asyncHttpClient, httpClient,
configuration, BACKOFF);
Review Comment:
Thank you for clarifying, I have split `KinesisProxyV2` to
`KinesisProxySyncV2` and `KinesisProxyAsyncV2`.
For StreamRegistrar related operations (i.e registerStreamConsumer), only
`KinesisProxySyncV2` will be created.
For EFO related operations (i.e subsribeToShard), only `KinesisProxyAsyncV2`
will be created.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]