dannycranmer commented on code in PR #39:
URL:
https://github.com/apache/flink-connector-aws/pull/39#discussion_r1050513372
##########
flink-connector-aws-base/pom.xml:
##########
@@ -52,6 +52,11 @@ under the License.
<artifactId>netty-nio-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>apache-client</artifactId>
+ </dependency>
Review Comment:
We will need to ensure this is shaded correctly and NOTICE files are updated
##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java:
##########
@@ -55,25 +58,39 @@ public static KinesisProxyV2Interface
createKinesisProxyV2(final Properties conf
final AttributeMap.Builder clientConfiguration =
AttributeMap.builder();
populateDefaultValues(clientConfiguration);
- final SdkAsyncHttpClient httpClient =
+ final SdkHttpClient httpClient =
+ AWSGeneralUtil.createSyncHttpClient(
+ convertedProperties.merge(clientConfiguration.build()),
+ ApacheHttpClient.builder());
+
+ final SdkAsyncHttpClient asyncHttpClient =
AWSGeneralUtil.createAsyncHttpClient(
convertedProperties.merge(clientConfiguration.build()),
NettyNioAsyncHttpClient.builder());
+
final FanOutRecordPublisherConfiguration configuration =
new FanOutRecordPublisherConfiguration(configProps,
emptyList());
- Properties asyncClientProperties =
-
KinesisConfigUtil.getV2ConsumerAsyncClientProperties(configProps);
+ Properties clientProperties =
KinesisConfigUtil.getV2ConsumerClientProperties(configProps);
- final KinesisAsyncClient client =
+ final KinesisAsyncClient asyncClient =
AWSAsyncSinkUtil.createAwsAsyncClient(
- asyncClientProperties,
- httpClient,
+ clientProperties,
+ asyncHttpClient,
KinesisAsyncClient.builder(),
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
- return new KinesisProxyV2(client, httpClient, configuration, BACKOFF);
+ final KinesisClient client =
+ AWSAsyncSinkUtil.createAwsSyncClient(
+ clientProperties,
+ httpClient,
+ KinesisClient.builder(),
+
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
+
KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
+
+ return new KinesisProxyV2(
+ asyncClient, client, asyncHttpClient, httpClient,
configuration, BACKOFF);
Review Comment:
As we discussed, we should split `KinesisProxyV2` so we do not need to
create both client. In the case of registerStreamConsumer it will not use the
async client and it is expensive to create a thread pool, then trash it. We
should only create the async client when we actually need to use it
##########
flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java:
##########
@@ -161,4 +163,86 @@ S createAwsAsyncClient(
.region(getRegion(configProps))
.build();
}
+
+ /**
+ * @param configProps configuration properties
+ * @param httpClient the underlying HTTP client used to talk to AWS
+ * @return a new AWS Sync Client
+ */
+ public static <
+ S extends SdkClient,
+ T extends
+ AwsSyncClientBuilder<? extends T, S> &
AwsClientBuilder<? extends T, S>>
+ S createAwsSyncClient(
+ final Properties configProps,
+ final SdkHttpClient httpClient,
+ final T clientBuilder,
+ final String awsUserAgentPrefixFormat,
+ final String awsClientUserAgentPrefix) {
+ SdkClientConfiguration clientConfiguration =
SdkClientConfiguration.builder().build();
+ return createAwsSyncClient(
+ configProps,
+ clientConfiguration,
+ httpClient,
+ clientBuilder,
+ awsUserAgentPrefixFormat,
+ awsClientUserAgentPrefix);
+ }
+
+ /**
+ * @param configProps configuration properties
+ * @param clientConfiguration the AWS SDK v2 config to instantiate the
client
+ * @param httpClient the underlying HTTP client used to talk to AWS
+ * @return a new AWS Sync Client
+ */
+ public static <
+ S extends SdkClient,
+ T extends
+ AwsSyncClientBuilder<? extends T, S> &
AwsClientBuilder<? extends T, S>>
+ S createAwsSyncClient(
Review Comment:
Why do we need so many overloaded versions of this? Looks like only 1 is used
##########
flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java:
##########
@@ -301,6 +303,58 @@ public static SdkAsyncHttpClient createAsyncHttpClient(
return
httpClientBuilder.buildWithDefaults(config.merge(HTTP_CLIENT_DEFAULTS));
}
+ public static SdkHttpClient createSyncHttpClient(final Properties
configProperties) {
+ return createSyncHttpClient(configProperties,
ApacheHttpClient.builder());
+ }
+
+ public static SdkHttpClient createSyncHttpClient(
+ final Properties configProperties, final ApacheHttpClient.Builder
httpClientBuilder) {
+
+ final AttributeMap.Builder clientConfiguration =
+
AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true);
+
+ Optional.ofNullable(
+ configProperties.getProperty(
+
AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY))
+ .map(Integer::parseInt)
+ .ifPresent(
+ integer ->
+ clientConfiguration.put(
+
SdkHttpConfigurationOption.MAX_CONNECTIONS, integer));
+
+ Optional.ofNullable(
+ configProperties.getProperty(
+
AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS))
+ .map(Integer::parseInt)
+ .map(Duration::ofMillis)
+ .ifPresent(
+ timeout ->
+ clientConfiguration.put(
+
SdkHttpConfigurationOption.READ_TIMEOUT, timeout));
+
+
Optional.ofNullable(configProperties.getProperty(AWSConfigConstants.TRUST_ALL_CERTIFICATES))
+ .map(Boolean::parseBoolean)
+ .ifPresent(
+ bool ->
+ clientConfiguration.put(
+
SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, bool));
+
+
Optional.ofNullable(configProperties.getProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION))
+ .map(Protocol::valueOf)
+ .ifPresent(
+ protocol ->
+ clientConfiguration.put(
+ SdkHttpConfigurationOption.PROTOCOL,
protocol));
+
Review Comment:
Seems like this is duplicated for Async/Sync. Can we extract to a common
method?
##########
flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java:
##########
@@ -161,4 +163,86 @@ S createAwsAsyncClient(
.region(getRegion(configProps))
.build();
}
+
+ /**
+ * @param configProps configuration properties
+ * @param httpClient the underlying HTTP client used to talk to AWS
+ * @return a new AWS Sync Client
+ */
+ public static <
+ S extends SdkClient,
+ T extends
+ AwsSyncClientBuilder<? extends T, S> &
AwsClientBuilder<? extends T, S>>
+ S createAwsSyncClient(
+ final Properties configProps,
+ final SdkHttpClient httpClient,
+ final T clientBuilder,
+ final String awsUserAgentPrefixFormat,
+ final String awsClientUserAgentPrefix) {
+ SdkClientConfiguration clientConfiguration =
SdkClientConfiguration.builder().build();
+ return createAwsSyncClient(
+ configProps,
+ clientConfiguration,
+ httpClient,
+ clientBuilder,
+ awsUserAgentPrefixFormat,
+ awsClientUserAgentPrefix);
+ }
+
+ /**
+ * @param configProps configuration properties
+ * @param clientConfiguration the AWS SDK v2 config to instantiate the
client
+ * @param httpClient the underlying HTTP client used to talk to AWS
+ * @return a new AWS Sync Client
+ */
+ public static <
+ S extends SdkClient,
+ T extends
+ AwsSyncClientBuilder<? extends T, S> &
AwsClientBuilder<? extends T, S>>
+ S createAwsSyncClient(
+ final Properties configProps,
+ final SdkClientConfiguration clientConfiguration,
+ final SdkHttpClient httpClient,
+ final T clientBuilder,
+ final String awsUserAgentPrefixFormat,
+ final String awsClientUserAgentPrefix) {
+ String flinkUserAgentPrefix =
+
Optional.ofNullable(configProps.getProperty(awsClientUserAgentPrefix))
+ .orElse(
+ formatFlinkUserAgentPrefix(
+ awsUserAgentPrefixFormat +
V2_USER_AGENT_SUFFIX));
+
+ final ClientOverrideConfiguration overrideConfiguration =
+ createClientOverrideConfiguration(
+ clientConfiguration,
+ ClientOverrideConfiguration.builder(),
+ flinkUserAgentPrefix);
+
+ return createAwsSyncClient(configProps, clientBuilder, httpClient,
overrideConfiguration);
+ }
+
+ @VisibleForTesting
Review Comment:
Where is this tested such that it requires this scope?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]