Hi Ahmed,

Thanks for the detailed feedback and questions

Just a quick note that after gathering you and Hong’s feedback, I have updated 
the design to remove “CloudWatchMetricInput” model and introduce 
“MetricWriteRequest” instead, and we will expose an element converter that user 
can implement to convert from a generic input type to the static output type, 
i.e “MetricWriteRequest”.


> What about python API? this is sth we should consider ahead since the 
> abstract element converter doesn't have a Flink type mapping to be used from 
> python, this is a issue we faced with DDB before


For python DataStream API, I experimented with it and one approach I could 
think of is to expose an "ElementConverter<CompositeType<MapTypeInfo<String, 
String>>, MetricWriteRequest>" in “flink-python” package [1] that user can set 
in the SinkBuilder to convert from Flink Type to “MetricWriteRequest”. The 
reason for a MapTypeInfo input is similar to TableAPI - it needs to know the 
mapping of fields to the “MetricWriteRequest” attributes, i.e dimensions, 
values, etc. I am aware that fixing MapTypeInfo as input might be limiting here 
but it’s the best approach I can think of for now, let me know if you have any 
thoughts and suggestions on this.

Another option is “ElementConverter<CompositeType<RowTypeInfo>, 
MetricWriteRequest>“ but this has its own limitation as well.

In addition, after discussing with others who are more familiar with the 
“flink-python” package in our aws connector repo, I realised it is currently 
not in ready state for implementation and we still rely on the main 
flink-python repo for connectors. As such, I plan to exclude Python datastream 
API from this design but will consider the above ElementConverter as a 
potential future addition.



> Is CloudWatchMetricInput a newly introduced model class, I couldn't find it 
> in the sdkv2, If we are introducing it then it might be useful to add to the 
> FLIP since this is part of the API.


Yes, “MetricWriteRequest” is a newly introduced model class. Thanks I have 
added it to the FLIP. 



> I am not quite sure I follow, are you suggesting we introduce a specific new 
> converter class or relay that to users? Also since you mentioned FLIP-171, 
> are you suggesting to implement this sink as an extension to Async Sink, in 
> that case It is more confusing to me how we are going to use the map function 
> with the AsyncSink.ElementConvertor.


Thanks for pointing this out, Hong made a suggestion in his comment that I 
think helped clarify on this as well.

We will expose an “ElementConvertor” that converts from user defined input type 
to a static output type called “MetricWriteRequest”. Hence, user can provide 
their own element convertor logic and they can freely define their own input 
type.



> What do you propose to Bounded (Batch) and Unbounded (Streaming) them 
> differently? I can't find a specific thing in the FLIP


Thanks for flagging this, based on the current sink design, I think it should 
be able to support both batch and streaming mode without requiring different 
ways of handling or batching the records in the AsyncSinkWriter, but please let 
me know if there’s gap in my understanding here.



> I am not in favor of doing this as this will complicate the schema validation 
> on table creation, maybe we can use the whole schema as dimensions excluding 
> the values and the count, let me know your thoughts here.


I agree it complicates the schema, and yes I am open to explore options to 
simplify it.

My thoughts are if we want to exclude the values and counts from schema, then 
we need another way to provide information to the Sink so that it knows which 
column in the RowData corresponds to the values and counts that should be in 
the CW MetricDatum. And I believe the challenge here is we can’t expose 
ElementConvertor for TableAPI/SQL use cases here. Therefore, alternatives I can 
think of are:


* Option 1: Make “values” and “counts” as reserved keywords to be used in the 
conversion from RowData to MetricDatum. Downside of this is user may want to 
name their column differently based on their use case/context or may want to 
reserve these keywords for something else.
* Option 2: Introduce a set of prefix to identify them, i.e “VALUES_”, 
“COUNTS_”. This approach is not worth it for me as it merely shifts the 
complexity from table option to table column naming.


Also, note that values and counts are optional here, so user can choose to not 
define them in table options if for example they are using StatisticSet instead 
of Values and Counts. Making these options optional also help simplify the 
usage of schema.



> So we are making the metric part of the row data? have we considered not 
> doing that instead and having 1 table map to 1 metric instead of namespace? 
> It might be more suitable to enforce some validations on the dimensions 
> schema this way. Ofc this will probably have is introduce some intermediate 
> class in the model to hold the dimensions, values and counts without the 
> metric name and namespace that we will extract from the sink definition, let 
> me know your thoughts here?


Yes, the proposal is to make metricName part of row data, and yes that means 1 
table per namespace. 

The reason for 1 table per metricName is because it is aligned with CW 
PutMetricDataRequest API interface, as mentioned by Hong. Other considerations 
I had was on flexibility, for example 1 table per namespace can support/cater 
for 1 table per metricName use cases but not vice versa. 



> Are we going to allow all numeric types for values?


Yes, cloudwatch requires values to be Double, so I think we can support all 
numeric type here: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE.



> protected void submitRequestEntries(List<MetricDatum> requestEntries, 
> Consumer<List<MetricDatum>> requestResult) nit: This method should be 
> deprecated after 1.20. I hope the repo is upgraded by the time we implement 
> this


Thanks for flagging this, I have updated the example to be compatible with 2.0.



> Away from poison pills, what error handling are you suggesting? Are we 
> following the footsteps of the other AWS connectors with error 
> classification, is there any effort to abstract it on the AWS side?


Currently I am following the footsteps of other AWS connectors for error 
classification. I did a quick search and did not see JIRA raised to abstract 
it, I will raise it as a separate feature improvement JIRA. However, we are 
going to expose config to allow user to choose their error failover behavior 
i.e fail fast, retry, or drop the batch.



> And on the topic of poison pills, If I understand correctly that is a topic 
> that has been discussed for a while, this ofc breaks the at-least-once 
> semantic and might be confusing to the users, additionally since cloudwatch 
> API fails the full batch how are you suggesting we identify the poison pills? 
> I am personally in favor of global failures in this case but would love to 
> hear the feedback here.


This is a good point, and after gathering feedback from others, the current way 
forward is to expose a config for user to decide and control the error handling 
behavior:

Option 1: Fail the job.
Option 2: Keep retrying the batch.
Option 3: Drop the batch.

Once again, thanks for the feedback and questions, please let me know if 
there's anything I could help or expand further on

Regards,
Daren

[1] 
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors/kinesis.py



On 07/04/2025, 21:26, "Ahmed Hamdy" <hamdy10...@gmail.com 
<mailto:hamdy10...@gmail.com>> wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.






Hi Daren thanks for the FLIP


Just a couple of questions and comments?


> Usable in both DataStream and Table API/SQL
What about python API? this is sth we should consider ahead since the
abstract element converter doesn't have a Flink type mapping to be used
from python, this is a issue we faced with DDB before


> Therefore, the connector will provide a CloudWatchMetricInput model that
user can use to pass as input to the connector. For example, in DataStream
API, it could be a MapFunction called just before passing to the sink as
follows:
I am not quite sure I follow, are you suggesting we introduce a
specific new converter class or relay that to users? also since you
mentioned FLIP-171, are you suggesting to implement this sink as an
extension to Async Sink, in that case It is more confusing to me how we are
going to use the map function with the AsyncSink.ElementConvertor.


>public class SampleToCloudWatchMetricInputMapper implements MapFunction<
Sample, CloudWatchMetricInput>


Is CloudWatchMetricInput a newly introduced model class, I couldn't find it
in the sdkv2, If we are introducing it then it might be useful to add to
the FLIP since this is part of the API.




> Supports both Bounded (Batch) and Unbounded (Streaming)


What do you propose to handle them differently? I can't find a specific
thing in the FLIP


Regarding table API


> 'metric.dimension.keys' = 'cw_dim',


I am not in favor of doing this as this will complicate the schema
validation on table creation, maybe we can use the whole schema as
dimensions excluding the values and the count, let me know your thoughts
here.


> 'metric.name.key' = 'cw_metric_name',


So we are making the metric part of the row data? have we considered not
doing that instead and having 1 table map to 1 metric instead of namespace?
It might be more suitable to enforce some validations on the dimensions
schema this way. Ofc this will probably have is introduce some intermediate
class in the model to hold the dimensions, values and counts without the
metric name and namespace that we will extract from the sink definition,
let me know your thoughts here?




>`cw_value` BIGINT,
Are we going to allow all numeric types for values?


> protected void submitRequestEntries(
List<MetricDatum> requestEntries,
Consumer<List<MetricDatum>> requestResult)


nit: This method should be deprecated after 1.20. I hope the repo is
upgraded by the time we implement this


> Error Handling
Away from poison pills, what error handling are you suggesting? Are we
following the footsteps of the other AWS connectors with error
classification, is there any effort to abstract it on the AWS side?


And on the topic of poison pills, If I understand correctly that is a topic
that has been discussed for a while, this ofc breaks the at-least-once
semantic and might be confusing to the users, additionally since cloud
watch API fails the full batch how are you suggesting we identify the
poison pills? I am personally in favor of global failures in this case but
would love to hear the feedback here.






Best Regards
Ahmed Hamdy




On Mon, 7 Apr 2025 at 11:29, Wong, Daren <daren...@amazon.co.uk.inva 
<mailto:daren...@amazon.co.uk.inva>lid>
wrote:


> Hi Dev,
>
> I would like to start a discussion about FLIP: Amazon CloudWatch Metric
> Sink Connector
> https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing
>  
> <https://docs.google.com/document/d/1G2sQogV8S6M51qeAaTmvpClOSvklejjEXbRFFCv_T-c/edit?usp=sharing>
>
> This FLIP is proposing to add support for Amazon CloudWatch Metric sink in
> flink-connector-aws repo. Looking forward to your feedback, thank you
>
> Regards,
> Daren
>



Reply via email to