dannycranmer commented on code in PR #39:
URL: 
https://github.com/apache/flink-connector-aws/pull/39#discussion_r1050513372


##########
flink-connector-aws-base/pom.xml:
##########
@@ -52,6 +52,11 @@ under the License.
             <artifactId>netty-nio-client</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>apache-client</artifactId>
+        </dependency>

Review Comment:
   We will need to ensure this is shaded correctly and NOTICE files are updated 



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java:
##########
@@ -55,25 +58,39 @@ public static KinesisProxyV2Interface 
createKinesisProxyV2(final Properties conf
         final AttributeMap.Builder clientConfiguration = 
AttributeMap.builder();
         populateDefaultValues(clientConfiguration);
 
-        final SdkAsyncHttpClient httpClient =
+        final SdkHttpClient httpClient =
+                AWSGeneralUtil.createSyncHttpClient(
+                        convertedProperties.merge(clientConfiguration.build()),
+                        ApacheHttpClient.builder());
+
+        final SdkAsyncHttpClient asyncHttpClient =
                 AWSGeneralUtil.createAsyncHttpClient(
                         convertedProperties.merge(clientConfiguration.build()),
                         NettyNioAsyncHttpClient.builder());
+
         final FanOutRecordPublisherConfiguration configuration =
                 new FanOutRecordPublisherConfiguration(configProps, 
emptyList());
 
-        Properties asyncClientProperties =
-                
KinesisConfigUtil.getV2ConsumerAsyncClientProperties(configProps);
+        Properties clientProperties = 
KinesisConfigUtil.getV2ConsumerClientProperties(configProps);
 
-        final KinesisAsyncClient client =
+        final KinesisAsyncClient asyncClient =
                 AWSAsyncSinkUtil.createAwsAsyncClient(
-                        asyncClientProperties,
-                        httpClient,
+                        clientProperties,
+                        asyncHttpClient,
                         KinesisAsyncClient.builder(),
                         
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
                         
KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
 
-        return new KinesisProxyV2(client, httpClient, configuration, BACKOFF);
+        final KinesisClient client =
+                AWSAsyncSinkUtil.createAwsSyncClient(
+                        clientProperties,
+                        httpClient,
+                        KinesisClient.builder(),
+                        
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
+                        
KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
+
+        return new KinesisProxyV2(
+                asyncClient, client, asyncHttpClient, httpClient, 
configuration, BACKOFF);

Review Comment:
   As we discussed, we should split `KinesisProxyV2` so we do not need to 
create both client. In the case of registerStreamConsumer it will not use the 
async client and it is expensive to create a thread pool, then trash it. We 
should only create the async client when we actually need to use it



##########
flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java:
##########
@@ -161,4 +163,86 @@ S createAwsAsyncClient(
                 .region(getRegion(configProps))
                 .build();
     }
+
+    /**
+     * @param configProps configuration properties
+     * @param httpClient the underlying HTTP client used to talk to AWS
+     * @return a new AWS Sync Client
+     */
+    public static <
+                    S extends SdkClient,
+                    T extends
+                            AwsSyncClientBuilder<? extends T, S> & 
AwsClientBuilder<? extends T, S>>
+            S createAwsSyncClient(
+                    final Properties configProps,
+                    final SdkHttpClient httpClient,
+                    final T clientBuilder,
+                    final String awsUserAgentPrefixFormat,
+                    final String awsClientUserAgentPrefix) {
+        SdkClientConfiguration clientConfiguration = 
SdkClientConfiguration.builder().build();
+        return createAwsSyncClient(
+                configProps,
+                clientConfiguration,
+                httpClient,
+                clientBuilder,
+                awsUserAgentPrefixFormat,
+                awsClientUserAgentPrefix);
+    }
+
+    /**
+     * @param configProps configuration properties
+     * @param clientConfiguration the AWS SDK v2 config to instantiate the 
client
+     * @param httpClient the underlying HTTP client used to talk to AWS
+     * @return a new AWS Sync Client
+     */
+    public static <
+                    S extends SdkClient,
+                    T extends
+                            AwsSyncClientBuilder<? extends T, S> & 
AwsClientBuilder<? extends T, S>>
+            S createAwsSyncClient(

Review Comment:
   Why do we need so many overloaded versions of this? Looks like only 1 is used



##########
flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java:
##########
@@ -301,6 +303,58 @@ public static SdkAsyncHttpClient createAsyncHttpClient(
         return 
httpClientBuilder.buildWithDefaults(config.merge(HTTP_CLIENT_DEFAULTS));
     }
 
+    public static SdkHttpClient createSyncHttpClient(final Properties 
configProperties) {
+        return createSyncHttpClient(configProperties, 
ApacheHttpClient.builder());
+    }
+
+    public static SdkHttpClient createSyncHttpClient(
+            final Properties configProperties, final ApacheHttpClient.Builder 
httpClientBuilder) {
+
+        final AttributeMap.Builder clientConfiguration =
+                
AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true);
+
+        Optional.ofNullable(
+                        configProperties.getProperty(
+                                
AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY))
+                .map(Integer::parseInt)
+                .ifPresent(
+                        integer ->
+                                clientConfiguration.put(
+                                        
SdkHttpConfigurationOption.MAX_CONNECTIONS, integer));
+
+        Optional.ofNullable(
+                        configProperties.getProperty(
+                                
AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS))
+                .map(Integer::parseInt)
+                .map(Duration::ofMillis)
+                .ifPresent(
+                        timeout ->
+                                clientConfiguration.put(
+                                        
SdkHttpConfigurationOption.READ_TIMEOUT, timeout));
+
+        
Optional.ofNullable(configProperties.getProperty(AWSConfigConstants.TRUST_ALL_CERTIFICATES))
+                .map(Boolean::parseBoolean)
+                .ifPresent(
+                        bool ->
+                                clientConfiguration.put(
+                                        
SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, bool));
+
+        
Optional.ofNullable(configProperties.getProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION))
+                .map(Protocol::valueOf)
+                .ifPresent(
+                        protocol ->
+                                clientConfiguration.put(
+                                        SdkHttpConfigurationOption.PROTOCOL, 
protocol));
+

Review Comment:
   Seems like this is duplicated for Async/Sync. Can we extract to a common 
method?



##########
flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java:
##########
@@ -161,4 +163,86 @@ S createAwsAsyncClient(
                 .region(getRegion(configProps))
                 .build();
     }
+
+    /**
+     * @param configProps configuration properties
+     * @param httpClient the underlying HTTP client used to talk to AWS
+     * @return a new AWS Sync Client
+     */
+    public static <
+                    S extends SdkClient,
+                    T extends
+                            AwsSyncClientBuilder<? extends T, S> & 
AwsClientBuilder<? extends T, S>>
+            S createAwsSyncClient(
+                    final Properties configProps,
+                    final SdkHttpClient httpClient,
+                    final T clientBuilder,
+                    final String awsUserAgentPrefixFormat,
+                    final String awsClientUserAgentPrefix) {
+        SdkClientConfiguration clientConfiguration = 
SdkClientConfiguration.builder().build();
+        return createAwsSyncClient(
+                configProps,
+                clientConfiguration,
+                httpClient,
+                clientBuilder,
+                awsUserAgentPrefixFormat,
+                awsClientUserAgentPrefix);
+    }
+
+    /**
+     * @param configProps configuration properties
+     * @param clientConfiguration the AWS SDK v2 config to instantiate the 
client
+     * @param httpClient the underlying HTTP client used to talk to AWS
+     * @return a new AWS Sync Client
+     */
+    public static <
+                    S extends SdkClient,
+                    T extends
+                            AwsSyncClientBuilder<? extends T, S> & 
AwsClientBuilder<? extends T, S>>
+            S createAwsSyncClient(
+                    final Properties configProps,
+                    final SdkClientConfiguration clientConfiguration,
+                    final SdkHttpClient httpClient,
+                    final T clientBuilder,
+                    final String awsUserAgentPrefixFormat,
+                    final String awsClientUserAgentPrefix) {
+        String flinkUserAgentPrefix =
+                
Optional.ofNullable(configProps.getProperty(awsClientUserAgentPrefix))
+                        .orElse(
+                                formatFlinkUserAgentPrefix(
+                                        awsUserAgentPrefixFormat + 
V2_USER_AGENT_SUFFIX));
+
+        final ClientOverrideConfiguration overrideConfiguration =
+                createClientOverrideConfiguration(
+                        clientConfiguration,
+                        ClientOverrideConfiguration.builder(),
+                        flinkUserAgentPrefix);
+
+        return createAwsSyncClient(configProps, clientBuilder, httpClient, 
overrideConfiguration);
+    }
+
+    @VisibleForTesting

Review Comment:
   Where is this tested such that it requires this scope?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to