[ https://issues.apache.org/jira/browse/IGNITE-4140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15715036#comment-15715036 ]
ASF GitHub Bot commented on IGNITE-4140: ---------------------------------------- GitHub user adasari reopened a pull request: https://github.com/apache/ignite/pull/1296 IGNITE-4140 KafkaDataStreamer - added new kafka streamer which uses the extractor. You can merge this pull request into a Git repository by running: $ git pull https://github.com/adasari/ignite IGNITE-4140 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/ignite/pull/1296.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1296 ---- commit 15168221f38327aca89dfd7160fe473434526463 Author: unknown <ani...@anilkd-t450.jnpr.net> Date: 2016-11-28T12:29:10Z StreamAdapter - added empty check to avoid exception KafkaDataStreamer - added new kafka streamer which uses the extractor. commit b8bcc773604d0115764f86303a8f3ae7b14b666d Author: unknown <ani...@anilkd-t450.jnpr.net> Date: 2016-12-02T12:37:45Z Removed KafkaDataStreamer and refactored KafkaStreamer commit 27a30b5f9b0a2bcd088b508929e37cec422eedf8 Author: unknown <ani...@anilkd-t450.jnpr.net> Date: 2016-12-02T12:39:04Z Refactoring kafka streamer ---- > KafkaStreamer should use tuple extractor instead of decoders > ------------------------------------------------------------ > > Key: IGNITE-4140 > URL: https://issues.apache.org/jira/browse/IGNITE-4140 > Project: Ignite > Issue Type: Improvement > Components: streaming > Affects Versions: 1.7 > Reporter: Valentin Kulichenko > Assignee: Anil > Labels: patch-available > Fix For: 2.0 > > > Current design of {{KafkaStreamer}} looks incorrect to me. In particular, it > extends {{StreamAdapter}}, but ignores tuple extractors provided there and > uses native Kafka decoders instead. This for example makes impossible to > produce several entries from one message, like it can be done via > {{StreamMultipleTupleExtractor}} in other streamers. > To fix this, we should: > # Declare the {{KafkaStreamer}} like this: > {code} > KafkaStreamer<K, V> extends StreamAdapter<MessageAndMetadata<byte[], byte[]>, > K, V> > {code} > # Remove {{keyDecoder}} and {{valDecoder}} in favor of tuple extractors. > # Instead of doing {{getStreamer().addData(...)}} directly, call > {{addMessage(...)}} method providing the raw message consumed from Kafka > ({{MessageAndMetadata<byte[], byte[]>}}). This method will make sure that > configured extractor is invoked and that all entries are added to > {{IgniteDataStreamer}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)