imaffe opened a new pull request #18674:
URL: https://github.com/apache/flink/pull/18674


   Since the sink code is still evolving, for now I create this PR for 
temporary discussion purposes. Will create new PR to flink repo after the major 
code is merged.
   
   1. Test code
   I think the test code is better put in the PulsarWriterTest (unit test), 
since @syhily is working on the unit test, I'll add the corresponding unit test 
after this work.
   
   2. How to retrieve `currentSendTime`
   
   In 
[FLIP-33](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics)
 `currentSendTime` is an optional metric. At the same time it should reflect 
the time spent to send out the last record.
   
   
   However Kafka sink does not strictly follow the description. Instead Kafka 
uses the average send latency from Kafka client metrics as the 
`currentSendTime`  
[link](https://github.com/apache/flink/blob/7e43674abc2281af51ad83b2e3972f7ffb3d2c7b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L358).
   
   Currently I use a volatile long to store the send time for each message. 
Here I believe using volatile should be safe as we set the `lastSendTime` 
without relying on values from a different thread but I think this part worth 
notice.
   
   A more general design doc on how to retrieve the metrics can be found here : 
https://docs.google.com/document/d/1rEFs-_9LRK_g5fcYl6GECjaR57xFpZ2PnfJu5093gsg/edit#
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to