Hi,
quick, somewhat related question:
There is no Cloudwatch metrics reporter for Flink yet. Would it make sense
to add one, or is that not necessary because Cloudwatch supports other
metrics reporters (statds?)

Best,
Robert



On Wed, Apr 16, 2025 at 10:50 AM Daren Wong <darenwkt....@gmail.com> wrote:

> Hi devs,
>
> Thanks Hong for the help with moving the Draft into a FLIP.
>
> I have opened a Vote thread for the FLIP here
> https://lists.apache.org/thread/48rdzjbh8wvo8dr9q15vlsh1f5cojx4q.
>
> I will keep this discussion thread open and please feel free to continue
> leaving any feedback and questions here. Thank you very much
>
> Regards,
> Daren
>
>
> > On 14 Apr 2025, at 21:22, Wong, Daren <daren...@amazon.co.uk.INVALID>
> wrote:
> >
> > Hi Keith,
> >
> > Thanks for the feedback and questions
> >
> >> 1. Can you elaborate what are the possible configuration key and their
> examples for CloudWatchSinkProperties?
> >
> > In addition to standard AsyncSink configuration key, i.e
> “sink.batch.max-size“, the other possible configuration keys for TableAPI
> are:
> >
> > - metric.namespace // Required
> > - metric.name.key // Optional
> > - metric.dimension.keys // Optional
> > - metric.value.key // Optional
> > - metric.count.key // Optional
> > - metric.unit.key // Optional
> > - metric.storage-resolution.key // Optional
> > - metric.timestamp.key // Optional
> > - metric.statistic.max.key // Optional
> > - metric.statistic.min.key // Optional
> > - metric.statistic.sum.key // Optional
> > - metric.statistic.sample-count.key // Optional
> > - sink.invalid-metric.retry-mode // Optional
> >
> > At a high level, there are 3 types of key for TableAPI:
> >
> > * metric.namespace - Required in every CW PutMetricDataRequest
> > * metric.X.key - Column key identifier to map the Table column to the
> respective fields in the CW PutMetricDataRequest. For example,
> “metric.timestamp.key = my_timestamp” means the TableSink will look for
> column name/field “my_timestamp” to extract it’s value to be used as
> timestamp in CW PutMetricDataRequest.
> > * sink.invalid-metric.retry-mode - Error handling behavior when an
> Invalid record is present, i.e invalid timestamp.
> >
> > Note that in DataStream API, users do not need to specify “metric.X.key”
> as the elementConverter is exposed to them to achieve the same purpose.
> >
> >
> >> 2. I see that MetricWriteRequest's unit field is of String type. Is
> there a motivation of using String type as opposed to StandardUnit enum
> type? This cuts down on user error by left shifting correctness check to
> IDE/compiler.
> >
> > The motivation of using String is to not expose any AWS SDK specific
> models directly to user when using the connector. However, I plan to add
> validation in the sink to fail and provide a faster feedback loop to user
> if an unknown/invalid unit is set.
> >
> >
> >> 3. Does the three options provided for error handling behaviour
> specifically just for old metrics failure case or all 400s or for 500s as
> well? Will users benefit from a broader/more flexible error handling
> configuration? For example, desirable behaviour might be to i) fail job on
> permission issue ii) dropping old records that would be rejected by CW iii)
> retry on throttles or 500s or timeouts.
> >
> > The current proposed option is aligned with i) and iii) where a fatal
> exception (i.e invalid permission) will fail the job, and non-fatal
> exception (i.e throttles) will be retried by default. The more fine grained
> control offered to user here is specifically around InvalidRecordException
> where user can select any of the 3 options for retry mode via
> "sink.invalid-metric.retry-mode".
> >
> >
> >> 4. Should we consider bisecting or retrying remaining of the batch if
> CW PutMetricDataResponse provides sufficient information on which
> MetricDatum is rejected.
> >
> > Yes, I have explored this and PutMetricDataResponse does not provide
> sufficient information on which MetricDatum is rejected. For bisecting,
> Flink AsyncSink does not currently support this and we will need a custom
> implementation for this which increases complexity. In addition, I would
> think it we want to do bisecting, it should be done on Flink AsyncSink
> interface as other connectors can benefit from this as well.
> >
> >
> >
> >> 5. On the batching of requests, how do you propose batch size
> (specifically size) is enforced? Specifically, I am interested in how we
> are calculating the data sizes of a batch.
> >
> >
> > * maxBatchSize - This is the number of Record (“MetricWriteRequest” or
> "MetricDatum") in a single batch to be sent in a single
> PutMetricDataRequest. CW sets a hard limit for this to be 1000 as mentioned
> by Hong.
> > * maxRecordSizeInBytes - This is the size of a single Record
> (“MetricWriteRequest” or "MetricDatum") in bytes. CW does not expose method
> to get the size in byte. Therefore, the sink will estimate size based on
> the attributes in the MetricDatum, where double is treated as 8 bytes, and
> string size estimated based on utf-8/ASCII encoding (as per CW doc). I.e a
> MetricDatum with metric name (string length 10), 5 values and counts
> (double) will have 10 + 5*8*2 = 90 bytes. Therefore, most of the size is
> contributed by the string fields like dimensions followed by the values and
> count array.
> > * maxBatchSizeInBytes - This is the size of all records in a single
> batch to be flushed. CW PutMetricDataRequest has a hard limit of 1MB which
> will be the hard limit of this parameter as well.
> >
> >
> > Therefore, a sample set of limits for these config could be:
> maxBatchSize = 1000, maxRecordSizeInBytes = 1000, maxBatchSizeInBytes =
> 1000*1000
> >
> > Regards,
> > Daren
> >
> >
> > On 14/04/2025, 09:17, "Keith Lee" <leekeiabstract...@gmail.com <mailto:
> leekeiabstract...@gmail.com>> wrote:
> >
> >
> > CAUTION: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
> >
> >
> >
> >
> >
> >
> > Hello Daren,
> >
> >
> > Thank you for the FLIP. Questions below:
> >
> >
> > 1. Can you elaborate what are the possible configuration key and their
> > examples for CloudWatchSinkProperties?
> >
> >
> > 2. I see that MetricWriteRequest's unit field is of String type. Is
> there a
> > motivation of using String type as opposed to StandardUnit enum type?
> This
> > cuts down on user error by left shifting correctness check to
> IDE/compiler.
> > -
> >
> https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/model/StandardUnit.html
> <
> https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/cloudwatch/model/StandardUnit.html
> >
> >
> >
> >> In addition, CloudWatch rejects any metric that’s more than 2 weeks old,
> > we will add a configurable option for users to determine the error
> handling
> > behavior of either: 1) drop the records OR 2) trigger a job failure OR 3)
> > keep retrying the batch.
> >
> >
> > 3. Does the three options provided for error handling behaviour
> > specifically just for old metrics failure case or all 400s or for 500s as
> > well? Will users benefit from a broader/more flexible error handling
> > configuration? For example, desirable behaviour might be to i) fail job
> on
> > permission issue ii) dropping old records that would be rejected by CW
> iii)
> > retry on throttles or 500s or timeouts.
> >
> >
> >> If the batch contains one MetricDatum poison pill, the request will fail
> > and be handled as a fully failed request.
> >
> >
> > 4. CShould we consider bisecting or retrying remaining of the batch if CW
> > PutMetricDataResponse provides sufficient information on which
> MetricDatum
> > is rejected.
> >
> >
> >> A list of MetricWriteRequest will be batched based on maxBatchSize which
> > is then submitted as a PutMetricDataRequest.
> >
> >
> > 5. On the batching of requests, how do you propose batch size
> (specifically
> > size) is enforced? Specifically, I am interested in how we are
> calculating
> > the data sizes of a batch.
> >
> >
> > Best regards
> > Keith Lee
> >
> >
> >
> >
> > On Fri, Apr 11, 2025 at 6:43 PM Wong, Daren <daren...@amazon.co.uk.inva
> <mailto:daren...@amazon.co.uk.inva>lid>
> > wrote:
> >
> >
> >> Hi Hong,
> >>
> >> Thanks for the comments and suggestions, really appreciate it!
> >>
> >>
> >>> Clarify Cloudwatch API limitations and handling in the sink. [1] Great
> >> to see we're being explicit with the Java API model! Let's be explicit
> >> about the PutMetricsData API constraints and how we handle them (it
> would
> >> be good to have a separate section):
> >>> 1. Request size and count. Maximum size per request is 1MB, with
> >> limit of 1000 metrics. We need to enforce this during batching to
> prevent
> >> users from shooting themselves in the foot!
> >>> 2. For Values/Counts, limit is 150 unique metrics within a single
> >> request.
> >>> 3. Data type limitations. CloudWatch API uses Java double, but it
> >> doesn't support Double.NaN. We need to be explicit to handle improperly
> >> formatted data. We can consider failing fast/slow as you have suggested.
> >> Consider using "StrictEntityValidation" in the failure handling. [1]
> (For
> >> the design, we can simply mention, but work out the details when we
> >> implement)
> >>> 4. Timestamp limitations. Cloudwatch also has limitations around
> >> accepted timestamps (as you have noted). Metric data can be 48h in the
> past
> >> or 2h in the future. Let's clarify how we handle invalid values.
> >>> 5. Data ordering. CW API doesn't seem to specify limitations
> >> around out-of-order / repeat data. That's great, and it would be good
> to be
> >> explicit about and validate this behavior.
> >>
> >>
> >> This is very detailed, thank you, I have updated the FLIP outlining
> these
> >> limitations. In summary, here’s how they translate to limitation in the
> >> AsyncSink configuration:
> >>
> >>
> >> * Maximum size per CW PutMetricDataRequest is 1MB → maxBatchSizeInBytes
> >> cannot be more than 1 MB
> >> * Maximum number of MetricDatum per CW PutMetricDataRequest is 1000 →
> >> maxBatchSize cannot be more than 1000
> >> * Maximum 150 unique values in MetricDatum.Values → maxRecordSizeInBytes
> >> cannot be more than 150 Bytes (assuming each 1 value size is 1 byte)
> >> * CloudWatch API uses Java double, but it doesn't support Double.NaN →
> Use
> >> StrictEntityValidation
> >> * MetricDatum Timestamp limitations (up to 2 weeks in the past and up
> to 2
> >> hours into the future) → Validation against this with user choice of
> error
> >> handling behavior for this case
> >> * Data ordering. Yes I have validated CW accepts out-of-order data, I
> have
> >> updated the FLIP to point this out.
> >>
> >>
> >>
> >>
> >>> The PutMetricData API supports two data modes, EntityMetricData and
> >> MetricData [1]. Since we are only supporting MetricData for now, let's
> make
> >> sure our interface allows the extension to support EntityMetricData in
> the
> >> future. For example, we should make sure we use our own data model
> classes
> >> instead of using AWS SDK classes. Also, we currently propose to use
> >> wrappers instead of primitives. Let's use the primitive where we can
> >>
> >>
> >> Yes, agree on making the interface allows extension to support
> >> EntityMetricData in the future.
> >>
> >> We are using our own data model “MetricWriteRequest” and have updated
> the
> >> FLIP to use primitives.
> >>
> >>
> >>
> >>> - PutMetricData supports StrictEntityValidation [1]. As mentioned
> >> above, let's use this.
> >>> - I like that we enforce a single namespace per sink, since that is
> >> aligned with the PutMetricData API interface. Let's be explicit on the
> >> reasoning in the FLIP!
> >>> - Clarify sink data semantics. Since we're using the async sink, we
> >> only provide at-least-once semantics. Let’s make this guarantee
> explicit.
> >>
> >>
> >> Agree and updated FLIP
> >>
> >>
> >>
> >>> 4. CW sink interface. Currently we are proposing to have a static input
> >> data type instead of generic input type. This would require user to use
> a
> >> map separately (As you have noted). For future extensibility, I would
> >> prefer if we exposed an ElementConverter directly to the user. That
> way, we
> >> can provide a custom class "MetricWriteRequest" in the output interface
> of
> >> the ElementConverter that can be extended to support additional features
> >> (e.g. EntityMetricData) in the future.
> >>
> >>
> >> Thanks, I agree with both suggestions on exposing ElementConverter to
> >> user, and provide a custom class “MetricWriteRequest” in the output for
> >> extensibility. Updated the FLIP as well.
> >>
> >>
> >>
> >>> 5. Table API design.
> >>> - I'm not a fan of the way we currently use dimensions in the
> >> properties.
> >>> - It would be better to use a Flink-native SQL support like PRIMARY
> >> KEY instead [2]. This also enforces that the specified dimension cannot
> be
> >> null.
> >>
> >>
> >> Thanks for the suggestion, but I also see limitation in this approach
> for
> >> when user wants to define more than 1 dimension columns with PRIMARY
> KEY,
> >> and CloudWatch also allows dimensions to be optional as well. Hence, I
> see
> >> the current approach as being more flexible for user to configure, let
> me
> >> know what your thoughts are here.
> >>
> >>
> >>
> >>> 6. Housekeeping
> >>> - It would be good to tidy up the public interfaces linked. For
> >> example, we don't make any explicit usage of the public interfaces in
> >> FLIP-191, so we can remove that.
> >>
> >>
> >> Thanks for raising this, agreed and have updated the FLIP.
> >>
> >>
> >> Regards,
> >> Daren
> >>
> >> On 08/04/2025, 12:02, "Hong Liang" <h...@apache.org <mailto:
> h...@apache.org> <mailto:
> >> h...@apache.org <mailto:h...@apache.org>>> wrote:
> >>
> >>
> >> CAUTION: This email originated from outside of the organization. Do not
> >> click links or open attachments unless you can confirm the sender and
> know
> >> the content is safe.
> >>
> >>
> >>
> >>
> >>
> >>
> >> Hi Daren,
> >>
> >>
> >> Thanks for the contribution — exciting to see support for new sinks!
> I’ve
> >> added a few comments and suggestions below:
> >>
> >>
> >> 1. Clarify Cloudwatch API limitations and handling in the sink. [1]
> >> Great to see we're being explicit with the Java API model! Let's be
> >> explicit about the PutMetricsData API constraints and how we handle them
> >> (it would be good to have a separate section):
> >> 1. Request size and count. Maximum size per request is 1MB, with
> >> limit of 1000 metrics. We need to enforce this during batching to
> prevent
> >> users from shooting themselves in the foot!
> >> 2. For Values/Counts, limit is 150 unique metrics within a single
> >> request.
> >> 3. Data type limitations. CloudWatch API uses Java double, but it
> >> doesn't support Double.NaN. We need to be explicit to handle improperly
> >> formatted data. We can consider failing fast/slow as you have suggested.
> >> Consider using "StrictEntityValidation" in the failure handling. [1]
> (For
> >> the design, we can simply mention, but work out the details when we
> >> implement)
> >> 4. Timestamp limitations. Cloudwatch also has limitations around
> >> accepted timestamps (as you have noted). Metric data can be 48h in the
> past
> >> or 2h in the future. Let's clarify how we handle invalid values.
> >> 5. Data ordering. CW API doesn't seem to specify limitations around
> >> out-of-order / repeat data. That's great, and it would be good to be
> >> explicit about and validate this behavior.
> >> 2. Clarify supported features [1]
> >> - The PutMetricData API supports two data modes, EntityMetricData and
> >> MetricData [1]. Since we are only supporting MetricData for now, let's
> make
> >> sure our interface allows the extension to support EntityMetricData in
> the
> >> future. For example, we should make sure we use our own data model
> classes
> >> instead of using AWS SDK classes. Also, we currently propose to use
> >> wrappers instead of primitives. Let's use the primitive where we can :).
> >> - PutMetricData supports StrictEntityValidation [1]. As mentioned
> >> above, let's use this.
> >> - I like that we enforce a single namespace per sink, since that is
> >> aligned with the PutMetricData API interface. Let's be explicit on the
> >> reasoning in the FLIP!
> >> 3. Clarify sink data semantics. Since we're using the async sink, we
> only
> >> provide at-least-once semantics. Let’s make this guarantee explicit.
> >> 4. CW sink interface. Currently we are proposing to have a static input
> >> data type instead of generic input type. This would require user to use
> a
> >> map separately (As you have noted). For future extensibility, I would
> >> prefer if we exposed an ElementConverter directly to the user. That
> way, we
> >> can provide a custom class "MetricWriteRequest" in the output interface
> of
> >> the ElementConverter that can be extended to support additional features
> >> (e.g. EntityMetricData) in the future.
> >> 5. Table API design.
> >> - I'm not a fan of the way we currently use dimensions in the
> >> properties.
> >> - It would be better to use a Flink-native SQL support like PRIMARY KEY
> >> instead [2]. This also enforces that the specified dimension cannot be
> >> null.
> >> 6. Housekeeping
> >> - It would be good to tidy up the public interfaces linked. For example,
> >> we don't make any explicit usage of the public interfaces in FLIP-191,
> so
> >> we can remove that.
> >>
> >>
> >>
> >>
> >> Overall, nice FLIP! Thanks for the detail and making it an easy read.
> Hope
> >> the above helps!
> >>
> >>
> >>
> >>
> >> Cheers,
> >> Hong
> >>
> >>
> >>
> >>
> >> [1]
> >>
> >>
> https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html
> <
> https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html
> >
> >> <
> >>
> https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html
> <
> https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html
> >
> >>>
> >>
> >>
> >> [2]
> >>
> >>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key
> <
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key
> >
> >> <
> >>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key
> <
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#primary-key
> >
> >>>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Mon, Apr 7, 2025 at 9:24 PM Ahmed Hamdy <hamdy10...@gmail.com
> <mailto:hamdy10...@gmail.com> <mailto:
> >> hamdy10...@gmail.com <mailto:hamdy10...@gmail.com>>> wrote:
> >>
> >>
> >>> Hi Daren thanks for the FLIP
> >>>
> >>> Just a couple of questions and comments?
> >>>
> >>>> Usable in both DataStream and Table API/SQL
> >>> What about python API? this is sth we should consider ahead since the
> >>> abstract element converter doesn't have a Flink type mapping to be used
> >>> from python, this is a issue we faced with DDB before
> >>>
> >>>> Therefore, the connector will provide a CloudWatchMetricInput model
> >> that
> >>> user can use to pass as input to the connector. For example, in
> >> DataStream
> >>> API, it could be a MapFunction called just before passing to the sink
> as
> >>> follows:
> >>> I am not quite sure I follow, are you suggesting we introduce a
> >>> specific new converter class or relay that to users? also since you
> >>> mentioned FLIP-171, are you suggesting to implement this sink as an
> >>> extension to Async Sink, in that case It is more confusing to me how we
> >> are
> >>> going to use the map function with the AsyncSink.ElementConvertor.
> >>>
> >>>> public class SampleToCloudWatchMetricInputMapper implements
> MapFunction<
> >>> Sample, CloudWatchMetricInput>
> >>>
> >>> Is CloudWatchMetricInput a newly introduced model class, I couldn't
> find
> >> it
> >>> in the sdkv2, If we are introducing it then it might be useful to add
> to
> >>> the FLIP since this is part of the API.
> >>>
> >>>
> >>>> Supports both Bounded (Batch) and Unbounded (Streaming)
> >>>
> >>> What do you propose to handle them differently? I can't find a specific
> >>> thing in the FLIP
> >>>
> >>> Regarding table API
> >>>
> >>>> 'metric.dimension.keys' = 'cw_dim',
> >>>
> >>> I am not in favor of doing this as this will complicate the schema
> >>> validation on table creation, maybe we can use the whole schema as
> >>> dimensions excluding the values and the count, let me know your
> thoughts
> >>> here.
> >>>
> >>>> 'metric.name.key' = 'cw_metric_name',
> >>>
> >>> So we are making the metric part of the row data? have we considered
> not
> >>> doing that instead and having 1 table map to 1 metric instead of
> >> namespace?
> >>> It might be more suitable to enforce some validations on the dimensions
> >>> schema this way. Ofc this will probably have is introduce some
> >> intermediate
> >>> class in the model to hold the dimensions, values and counts without
> the
> >>> metric name and namespace that we will extract from the sink
> definition,
> >>> let me know your thoughts here?
> >>>
> >>>
> >>>> `cw_value` BIGINT,
> >>> Are we going to allow all numeric types for values?
> >>>
> >>>> protected void submitRequestEntries(
> >>> List<MetricDatum> requestEntries,
> >>> Consumer<List<MetricDatum>> requestResult)
> >>>
> >>> nit: This method should be deprecated after 1.20. I hope the repo is
> >>> upgraded by the time we implement this
> >>>
> >>>> Error Handling
> >>> Away from poison pills, what error handling are you suggesting? Are we
> >>> following the footsteps of the other AWS connectors with error
> >>> classification, is there any effort to abstract it on the AWS side?
> >>>
> >>> And on the topic of poison pills, If I understand correctly that is a
> >> topic
> >>> that has been discussed for a while, this ofc breaks the at-least-once
> >>> semantic and might be confusing to the users, additionally since cloud
> >>> watch API fails the full batch how are you suggesting we identify the
> >>> poison pills? I am personally in favor of global failures in this case
> >> but
> >>> would love to hear the feedback here.
> >>>
> >>>
> >>>
> >>> Best Regards
> >>> Ahmed Hamdy
> >>>
> >>>
> >>> On Mon, 7 Apr 2025 at 11:29, Wong, Daren <daren...@amazon.co.uk.inva
> <mailto:daren...@amazon.co.uk.inva>
> >> <mailto:daren...@amazon.co.uk.inva <mailto:daren...@amazon.co.uk.inva
> >>lid>
> >>> wrote:
> >>>
> >>>> Hi Dev,
> >>>>
> >>>> I would like to start a discussion about FLIP: Amazon CloudWatch
> Metric
> >>>> Sink Connector
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing
> <
> https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing
> >
> >> <
> >>
> https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing
> <
> https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing
> >
> >>>
> >>>>
> >>>> This FLIP is proposing to add support for Amazon CloudWatch Metric
> sink
> >>> in
> >>>> flink-connector-aws repo. Looking forward to your feedback, thank you
> >>>>
> >>>> Regards,
> >>>> Daren
> >>>>
> >>>
> >>
> >>
> >>
> >>
> >
> >
> >
>
>

Reply via email to