Hi Daren,

Thanks for the contribution — exciting to see support for new sinks! I’ve
added a few comments and suggestions below:

1. Clarify Cloudwatch API limitations and handling in the sink. [1]
Great to see we're being explicit with the Java API model! Let's be
explicit about the PutMetricsData API constraints and how we handle them
(it would be good to have a separate section):
       1. Request size and count. Maximum size per request is 1MB, with
limit of 1000 metrics. We need to enforce this during batching to prevent
users from shooting themselves in the foot!
       2. For Values/Counts, limit is 150 unique metrics within a single
request.
       3. Data type limitations. CloudWatch API uses Java double, but it
doesn't support Double.NaN. We need to be explicit to handle improperly
formatted data. We can consider failing fast/slow as you have suggested.
Consider using "StrictEntityValidation" in the failure handling. [1] (For
the design, we can simply mention, but work out the details when we
implement)
       4. Timestamp limitations. Cloudwatch also has limitations around
accepted timestamps (as you have noted). Metric data can be 48h in the past
or 2h in the future. Let's clarify how we handle invalid values.
       5. Data ordering. CW API doesn't seem to specify limitations around
out-of-order / repeat data. That's great, and it would be good to be
explicit about and validate this behavior.
2. Clarify supported features [1]
   - The PutMetricData API supports two data modes, EntityMetricData and
MetricData [1]. Since we are only supporting MetricData for now, let's make
sure our interface allows the extension to support EntityMetricData in the
future. For example, we should make sure we use our own data model classes
instead of using AWS SDK classes. Also, we currently propose to use
wrappers instead of primitives. Let's use the primitive where we can :).
    - PutMetricData supports StrictEntityValidation [1]. As mentioned
above, let's use this.
   - I like that we enforce a single namespace per sink, since that is
aligned with the PutMetricData API interface. Let's be explicit on the
reasoning in the FLIP!
3. Clarify sink data semantics. Since we're using the async sink, we only
provide at-least-once semantics. Let’s make this guarantee explicit.
4. CW sink interface. Currently we are proposing to have a static input
data type instead of generic input type. This would require user to use a
map separately (As you have noted). For future extensibility, I would
prefer if we exposed an ElementConverter directly to the user. That way, we
can provide a custom class "MetricWriteRequest" in the output interface of
the ElementConverter that can be extended to support additional features
(e.g. EntityMetricData) in the future.
5. Table API design.
    - I'm not a fan of the way we currently use dimensions in the
properties.
    - It would be better to use a Flink-native SQL support like PRIMARY KEY
instead [2]. This also enforces that the specified dimension cannot be null.
6. Housekeeping
  - It would be good to tidy up the public interfaces linked. For example,
we don't make any explicit usage of the public interfaces in FLIP-191, so
we can remove that.


Overall, nice FLIP! Thanks for the detail and making it an easy read. Hope
the above helps!


Cheers,
Hong


[1]
https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html

[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key




On Mon, Apr 7, 2025 at 9:24 PM Ahmed Hamdy <hamdy10...@gmail.com> wrote:

> Hi Daren thanks for the FLIP
>
> Just a couple of questions and comments?
>
> > Usable in both DataStream and Table API/SQL
> What about python API? this is sth we should consider ahead since the
> abstract element converter doesn't have a Flink type mapping to be used
> from python, this is a issue we faced with DDB before
>
> > Therefore, the connector will provide a CloudWatchMetricInput model that
> user can use to pass as input to the connector. For example, in DataStream
> API, it could be a MapFunction called just before passing to the sink as
> follows:
> I am not quite sure I follow, are you suggesting we introduce a
> specific new converter class or relay that to users? also since you
> mentioned FLIP-171, are you suggesting to implement this sink as an
> extension to Async Sink, in that case It is more confusing to me how we are
> going to use the map function with the AsyncSink.ElementConvertor.
>
> >public class SampleToCloudWatchMetricInputMapper implements MapFunction<
> Sample, CloudWatchMetricInput>
>
> Is CloudWatchMetricInput a newly introduced model class, I couldn't find it
> in the sdkv2, If we are introducing it then it might be useful to add to
> the FLIP since this is part of the API.
>
>
> > Supports both Bounded (Batch) and Unbounded (Streaming)
>
> What do you propose to handle them differently? I can't find a specific
> thing in the FLIP
>
> Regarding table API
>
> > 'metric.dimension.keys' = 'cw_dim',
>
> I am not in favor of doing this as this will complicate the schema
> validation on table creation, maybe we can use the whole schema as
> dimensions excluding the values and the count, let me know your thoughts
> here.
>
> > 'metric.name.key' = 'cw_metric_name',
>
> So we are making the metric part of the row data? have we considered not
> doing that instead and having 1 table map to 1 metric instead of namespace?
> It might be more suitable to enforce some validations on the dimensions
> schema this way. Ofc this will probably have is introduce some intermediate
> class in the model to hold the dimensions, values and counts without the
> metric name and namespace that we will extract from the sink definition,
> let me know your thoughts here?
>
>
> >`cw_value` BIGINT,
> Are we going to allow all numeric types for values?
>
> >    protected void submitRequestEntries(
>           List<MetricDatum> requestEntries,
>           Consumer<List<MetricDatum>> requestResult)
>
> nit: This method should be deprecated after 1.20. I hope the repo is
> upgraded by the time we implement this
>
> > Error Handling
> Away from poison pills, what error handling are you suggesting? Are we
> following the footsteps of the other AWS connectors with error
> classification, is there any effort to abstract it on the AWS side?
>
> And on the topic of poison pills, If I understand correctly that is a topic
> that has been discussed for a while, this ofc breaks the at-least-once
> semantic and might be confusing to the users, additionally since cloud
> watch API fails the full batch how are you suggesting we identify the
> poison pills? I am personally in favor of global failures in this case but
> would love to hear the feedback here.
>
>
>
> Best Regards
> Ahmed Hamdy
>
>
> On Mon, 7 Apr 2025 at 11:29, Wong, Daren <daren...@amazon.co.uk.invalid>
> wrote:
>
> > Hi Dev,
> >
> > I would like to start a discussion about FLIP: Amazon CloudWatch Metric
> > Sink Connector
> >
> https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing
> >
> > This FLIP is proposing to add support for Amazon CloudWatch Metric sink
> in
> > flink-connector-aws repo. Looking forward to your feedback, thank you
> >
> > Regards,
> > Daren
> >
>

Reply via email to