darenwkt commented on code in PR #202:
URL: 
https://github.com/apache/flink-connector-aws/pull/202#discussion_r2134500362


##########
flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/CloudWatchSinkBuilder.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import software.amazon.awssdk.http.Protocol;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link CloudWatchSink}.
+ *
+ * <p>The following example shows the minimum setup to create a {@link 
CloudWatchSink} that writes
+ * String values to a CloudWatch named cloudWatchUrl.
+ *
+ * <pre>{@code
+ * Properties sinkProperties = new Properties();
+ * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ *
+ * CloudWatchSink<String> cloudWatchSink =
+ *         CloudWatchSink.<String>builder()
+ *                 .setNamespace("namespace")
+ *                 .setCloudWatchClientProperties(sinkProperties)
+ *                 .build();
+ * }</pre>
+ *
+ * <p>If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * <ul>
+ *   <li>{@code maxBatchSize} will be 100
+ *   <li>{@code maxInFlightRequests} will be 50
+ *   <li>{@code maxBufferedRequests} will be 5000
+ *   <li>{@code maxBatchSizeInBytes} will be 100 KB
+ *   <li>{@code maxTimeInBufferMs} will be 5000ms
+ *   <li>{@code maxRecordSizeInBytes} will be 1 KB
+ *   <li>{@code invalidMetricDataRetryMode} will be FAIL_ON_ERROR
+ * </ul>
+ *
+ * @param <InputT> type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class CloudWatchSinkBuilder<InputT>
+        extends AsyncSinkBaseBuilder<InputT, MetricWriteRequest, 
CloudWatchSinkBuilder<InputT>> {
+
+    private static final int DEFAULT_MAX_BATCH_SIZE = 100;
+    private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+    private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 5_000;
+    private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 100 * 1000;
+    private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+    private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1000;
+    private static final InvalidMetricDataRetryMode 
DEFAULT_INVALID_METRIC_DATA_RETRY_MODE =
+            InvalidMetricDataRetryMode.FAIL_ON_ERROR;
+    private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1;

Review Comment:
   Yes, they are configurable, and they are configured through 
`AsyncSinkBaseBuilder`, so that we don't have to add it here. In TableAPI, they 
are configured in `addAsyncOptionsToBuilder` in `CloudWatchDynamicTableFactory`



##########
flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/client/CloudWatchAsyncClientProvider.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.sink.client;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.cloudwatch.sink.CloudWatchConfigConstants;
+
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
+
+import java.util.Properties;
+
+/** Provides a {@link CloudWatchAsyncClient}. */
+@Internal
+public class CloudWatchAsyncClientProvider implements 
SdkClientProvider<CloudWatchAsyncClient> {
+
+    private final SdkAsyncHttpClient httpClient;
+    private final CloudWatchAsyncClient cloudWatchAsyncClient;

Review Comment:
   I see what you mean, I think the benefit of using `getClient` is that the 
same cloudwatchClient instance can be reused many times in the same provider, 
so I am inclined to keep this for now, let me know what you think



##########
flink-connector-aws/flink-connector-cloudwatch/src/main/java/org/apache/flink/connector/cloudwatch/sink/client/SdkClientProvider.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cloudwatch.sink.client;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.utils.SdkAutoCloseable;
+
+/** Provides a {@link SdkClient}. */
+@Internal
+public interface SdkClientProvider<T extends SdkClient> extends 
SdkAutoCloseable {

Review Comment:
   We don't need it strictly, I just saw that we use this for DDB and SQS sink 
as well and thought of using it here to be consistent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to