[ 
https://issues.apache.org/jira/browse/IGNITE-4140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15702469#comment-15702469
 ] 

Anil commented on IGNITE-4140:
------------------------------


Hi [~avinogradov] 

Could you please review the changes ? Thanks

KafkaDataStreamer can be used when

1. Key of kafka message is key of the cache entry and message of the kafka 
message as value of cache entry 
2. Kafka message is actually number of cache entries

Tuple extractor is mandatory for both #1 and #2

I see two possible issues with KafkaStreamer and KafkaDataStreamer

a. Order of messages is not guaranteed as data streamer not guarantee the order 
by design
b. possibility of streamer closed exception when node rejoin.

#a - entries must be added to cache directly instead of data streamer if the 
order of messages is must
#b - could you please share your inputs ?

Thanks



> KafkaStreamer should use tuple extractor instead of decoders
> ------------------------------------------------------------
>
>                 Key: IGNITE-4140
>                 URL: https://issues.apache.org/jira/browse/IGNITE-4140
>             Project: Ignite
>          Issue Type: Improvement
>          Components: streaming
>    Affects Versions: 1.7
>            Reporter: Valentin Kulichenko
>            Assignee: Anil
>             Fix For: 2.0
>
>
> Current design of {{KafkaStreamer}} looks incorrect to me. In particular, it 
> extends {{StreamAdapter}}, but ignores tuple extractors provided there and 
> uses native Kafka decoders instead. This for example makes impossible to 
> produce several entries from one message, like it can be done via 
> {{StreamMultipleTupleExtractor}} in other streamers.
> To fix this, we should:
> # Declare the {{KafkaStreamer}} like this:
> {code}
> KafkaStreamer<K, V> extends StreamAdapter<MessageAndMetadata<byte[], byte[]>, 
> K, V>
> {code}
> # Remove {{keyDecoder}} and {{valDecoder}} in favor of tuple extractors.
> # Instead of doing {{getStreamer().addData(...)}} directly, call 
> {{addMessage(...)}} method providing the raw message consumed from Kafka 
> ({{MessageAndMetadata<byte[], byte[]>}}). This method will make sure that 
> configured extractor is invoked and that all entries are added to 
> {{IgniteDataStreamer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to