[ 
https://issues.apache.org/jira/browse/SAMZA-1035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vladislav Sergeev updated SAMZA-1035:
-------------------------------------
    Description: 
Hello.

We use Apache Kafka as a databus and Apache Samza as a router.
I suppose good feature for samza will be returning offset of a message ,in the 
case when we transfer messages from topic A to topic B.

In our case we have SystemA that produce messages to input_topic and we route 
with some transformations that messages to topicA and topicB.
Sometimes it is usefull to collect metrics of processed messages.
We can do it when they come to method :

process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector 
messageCollector, TaskCoordinator taskCoordinator)

But when we route them to another topic like this:

messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", 
topic), keyOut, message));

We don't know what offset has message in routed topic.
As i saw in your code:
https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala

You use kafka producer that returns offsets for sent message.So it will be 
great to get it.




  was:
Hello.

We use Apache Kafka as a databus and Apache Samza as a router.
I suppose good feature for samza will be returning offset of a message ,in the 
case when we transfer messages from topic A to topic B.

In our case we have SystemA that produce messages to input_topic and we route 
with some transformations that messages to topicA and topicB.
Some times it is usefull to collect metrics of processed messages.
We can do it when they come to method :

process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector 
messageCollector, TaskCoordinator taskCoordinator)

But when we route them to another topic like this:

messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", 
topic), keyOut, message));

We don't know what offset has message in routed topic.
As i saw in your code:
https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala

You use kafka producer that returns offsets for sent message.So it will be 
great to get it.





> Return offset of produced message
> ---------------------------------
>
>                 Key: SAMZA-1035
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1035
>             Project: Samza
>          Issue Type: Wish
>            Reporter: Vladislav Sergeev
>            Priority: Minor
>
> Hello.
> We use Apache Kafka as a databus and Apache Samza as a router.
> I suppose good feature for samza will be returning offset of a message ,in 
> the case when we transfer messages from topic A to topic B.
> In our case we have SystemA that produce messages to input_topic and we route 
> with some transformations that messages to topicA and topicB.
> Sometimes it is usefull to collect metrics of processed messages.
> We can do it when they come to method :
> process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector 
> messageCollector, TaskCoordinator taskCoordinator)
> But when we route them to another topic like this:
> messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", 
> topic), keyOut, message));
> We don't know what offset has message in routed topic.
> As i saw in your code:
> https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
> You use kafka producer that returns offsets for sent message.So it will be 
> great to get it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to