Hi Ahmed, Thanks for the detailed feedback and questions
Just a quick note that after gathering you and Hong’s feedback, I have updated the design to remove “CloudWatchMetricInput” model and introduce “MetricWriteRequest” instead, and we will expose an element converter that user can implement to convert from a generic input type to the static output type, i.e “MetricWriteRequest”. > What about python API? this is sth we should consider ahead since the > abstract element converter doesn't have a Flink type mapping to be used from > python, this is a issue we faced with DDB before For python DataStream API, I experimented with it and one approach I could think of is to expose an "ElementConverter<CompositeType<MapTypeInfo<String, String>>, MetricWriteRequest>" in “flink-python” package [1] that user can set in the SinkBuilder to convert from Flink Type to “MetricWriteRequest”. The reason for a MapTypeInfo input is similar to TableAPI - it needs to know the mapping of fields to the “MetricWriteRequest” attributes, i.e dimensions, values, etc. I am aware that fixing MapTypeInfo as input might be limiting here but it’s the best approach I can think of for now, let me know if you have any thoughts and suggestions on this. Another option is “ElementConverter<CompositeType<RowTypeInfo>, MetricWriteRequest>“ but this has its own limitation as well. In addition, after discussing with others who are more familiar with the “flink-python” package in our aws connector repo, I realised it is currently not in ready state for implementation and we still rely on the main flink-python repo for connectors. As such, I plan to exclude Python datastream API from this design but will consider the above ElementConverter as a potential future addition. > Is CloudWatchMetricInput a newly introduced model class, I couldn't find it > in the sdkv2, If we are introducing it then it might be useful to add to the > FLIP since this is part of the API. Yes, “MetricWriteRequest” is a newly introduced model class. Thanks I have added it to the FLIP. > I am not quite sure I follow, are you suggesting we introduce a specific new > converter class or relay that to users? Also since you mentioned FLIP-171, > are you suggesting to implement this sink as an extension to Async Sink, in > that case It is more confusing to me how we are going to use the map function > with the AsyncSink.ElementConvertor. Thanks for pointing this out, Hong made a suggestion in his comment that I think helped clarify on this as well. We will expose an “ElementConvertor” that converts from user defined input type to a static output type called “MetricWriteRequest”. Hence, user can provide their own element convertor logic and they can freely define their own input type. > What do you propose to Bounded (Batch) and Unbounded (Streaming) them > differently? I can't find a specific thing in the FLIP Thanks for flagging this, based on the current sink design, I think it should be able to support both batch and streaming mode without requiring different ways of handling or batching the records in the AsyncSinkWriter, but please let me know if there’s gap in my understanding here. > I am not in favor of doing this as this will complicate the schema validation > on table creation, maybe we can use the whole schema as dimensions excluding > the values and the count, let me know your thoughts here. I agree it complicates the schema, and yes I am open to explore options to simplify it. My thoughts are if we want to exclude the values and counts from schema, then we need another way to provide information to the Sink so that it knows which column in the RowData corresponds to the values and counts that should be in the CW MetricDatum. And I believe the challenge here is we can’t expose ElementConvertor for TableAPI/SQL use cases here. Therefore, alternatives I can think of are: * Option 1: Make “values” and “counts” as reserved keywords to be used in the conversion from RowData to MetricDatum. Downside of this is user may want to name their column differently based on their use case/context or may want to reserve these keywords for something else. * Option 2: Introduce a set of prefix to identify them, i.e “VALUES_”, “COUNTS_”. This approach is not worth it for me as it merely shifts the complexity from table option to table column naming. Also, note that values and counts are optional here, so user can choose to not define them in table options if for example they are using StatisticSet instead of Values and Counts. Making these options optional also help simplify the usage of schema. > So we are making the metric part of the row data? have we considered not > doing that instead and having 1 table map to 1 metric instead of namespace? > It might be more suitable to enforce some validations on the dimensions > schema this way. Ofc this will probably have is introduce some intermediate > class in the model to hold the dimensions, values and counts without the > metric name and namespace that we will extract from the sink definition, let > me know your thoughts here? Yes, the proposal is to make metricName part of row data, and yes that means 1 table per namespace. The reason for 1 table per metricName is because it is aligned with CW PutMetricDataRequest API interface, as mentioned by Hong. Other considerations I had was on flexibility, for example 1 table per namespace can support/cater for 1 table per metricName use cases but not vice versa. > Are we going to allow all numeric types for values? Yes, cloudwatch requires values to be Double, so I think we can support all numeric type here: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE. > protected void submitRequestEntries(List<MetricDatum> requestEntries, > Consumer<List<MetricDatum>> requestResult) nit: This method should be > deprecated after 1.20. I hope the repo is upgraded by the time we implement > this Thanks for flagging this, I have updated the example to be compatible with 2.0. > Away from poison pills, what error handling are you suggesting? Are we > following the footsteps of the other AWS connectors with error > classification, is there any effort to abstract it on the AWS side? Currently I am following the footsteps of other AWS connectors for error classification. I did a quick search and did not see JIRA raised to abstract it, I will raise it as a separate feature improvement JIRA. However, we are going to expose config to allow user to choose their error failover behavior i.e fail fast, retry, or drop the batch. > And on the topic of poison pills, If I understand correctly that is a topic > that has been discussed for a while, this ofc breaks the at-least-once > semantic and might be confusing to the users, additionally since cloudwatch > API fails the full batch how are you suggesting we identify the poison pills? > I am personally in favor of global failures in this case but would love to > hear the feedback here. This is a good point, and after gathering feedback from others, the current way forward is to expose a config for user to decide and control the error handling behavior: Option 1: Fail the job. Option 2: Keep retrying the batch. Option 3: Drop the batch. Once again, thanks for the feedback and questions, please let me know if there's anything I could help or expand further on Regards, Daren [1] https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors/kinesis.py On 07/04/2025, 21:26, "Ahmed Hamdy" <hamdy10...@gmail.com <mailto:hamdy10...@gmail.com>> wrote: CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. Hi Daren thanks for the FLIP Just a couple of questions and comments? > Usable in both DataStream and Table API/SQL What about python API? this is sth we should consider ahead since the abstract element converter doesn't have a Flink type mapping to be used from python, this is a issue we faced with DDB before > Therefore, the connector will provide a CloudWatchMetricInput model that user can use to pass as input to the connector. For example, in DataStream API, it could be a MapFunction called just before passing to the sink as follows: I am not quite sure I follow, are you suggesting we introduce a specific new converter class or relay that to users? also since you mentioned FLIP-171, are you suggesting to implement this sink as an extension to Async Sink, in that case It is more confusing to me how we are going to use the map function with the AsyncSink.ElementConvertor. >public class SampleToCloudWatchMetricInputMapper implements MapFunction< Sample, CloudWatchMetricInput> Is CloudWatchMetricInput a newly introduced model class, I couldn't find it in the sdkv2, If we are introducing it then it might be useful to add to the FLIP since this is part of the API. > Supports both Bounded (Batch) and Unbounded (Streaming) What do you propose to handle them differently? I can't find a specific thing in the FLIP Regarding table API > 'metric.dimension.keys' = 'cw_dim', I am not in favor of doing this as this will complicate the schema validation on table creation, maybe we can use the whole schema as dimensions excluding the values and the count, let me know your thoughts here. > 'metric.name.key' = 'cw_metric_name', So we are making the metric part of the row data? have we considered not doing that instead and having 1 table map to 1 metric instead of namespace? It might be more suitable to enforce some validations on the dimensions schema this way. Ofc this will probably have is introduce some intermediate class in the model to hold the dimensions, values and counts without the metric name and namespace that we will extract from the sink definition, let me know your thoughts here? >`cw_value` BIGINT, Are we going to allow all numeric types for values? > protected void submitRequestEntries( List<MetricDatum> requestEntries, Consumer<List<MetricDatum>> requestResult) nit: This method should be deprecated after 1.20. I hope the repo is upgraded by the time we implement this > Error Handling Away from poison pills, what error handling are you suggesting? Are we following the footsteps of the other AWS connectors with error classification, is there any effort to abstract it on the AWS side? And on the topic of poison pills, If I understand correctly that is a topic that has been discussed for a while, this ofc breaks the at-least-once semantic and might be confusing to the users, additionally since cloud watch API fails the full batch how are you suggesting we identify the poison pills? I am personally in favor of global failures in this case but would love to hear the feedback here. Best Regards Ahmed Hamdy On Mon, 7 Apr 2025 at 11:29, Wong, Daren <daren...@amazon.co.uk.inva <mailto:daren...@amazon.co.uk.inva>lid> wrote: > Hi Dev, > > I would like to start a discussion about FLIP: Amazon CloudWatch Metric > Sink Connector > https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing > > <https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing> > > This FLIP is proposing to add support for Amazon CloudWatch Metric sink in > flink-connector-aws repo. Looking forward to your feedback, thank you > > Regards, > Daren >