IGNITE-4140 KafkaStreamer should use tuple extractor instead of decoders
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dfb44ba2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dfb44ba2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dfb44ba2 Branch: refs/heads/ignite-4371 Commit: dfb44ba2dca0cec44568239e318cf6863ed0c16e Parents: ca8ab2d Author: Anil <[email protected]> Authored: Wed Dec 7 12:06:38 2016 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Wed Dec 7 12:06:38 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/stream/StreamAdapter.java | 4 +- .../ignite/stream/kafka/KafkaStreamer.java | 48 +++++--------------- .../kafka/KafkaIgniteStreamerSelfTest.java | 36 +++++++++++---- 3 files changed, 40 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/dfb44ba2/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java index cb9566b..3f1dfad 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java @@ -179,8 +179,8 @@ public abstract class StreamAdapter<T, K, V> { } else { Map<K, V> m = multipleTupleExtractor.extract(msg); - - if (m != null) + + if (m != null && !m.isEmpty()) stmr.addData(m); } http://git-wip-us.apache.org/repos/asf/ignite/blob/dfb44ba2/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java index f46ee93..5767790 100644 --- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java +++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java @@ -28,7 +28,6 @@ import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; -import kafka.serializer.Decoder; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -45,7 +44,7 @@ import org.apache.ignite.stream.StreamAdapter; * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example">Consumer Consumer Group * Example</a> */ -public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> { +public class KafkaStreamer<K, V> extends StreamAdapter<MessageAndMetadata<byte[], byte[]>, K, V> { /** Retry timeout. */ private static final long DFLT_RETRY_TIMEOUT = 10000; @@ -64,12 +63,6 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> { /** Kafka consumer config. */ private ConsumerConfig consumerCfg; - /** Key decoder. */ - private Decoder<K> keyDecoder; - - /** Value decoder. */ - private Decoder<V> valDecoder; - /** Kafka consumer connector. */ private ConsumerConnector consumer; @@ -107,24 +100,6 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> { } /** - * Sets the key decoder. - * - * @param keyDecoder Key decoder. - */ - public void setKeyDecoder(Decoder<K> keyDecoder) { - this.keyDecoder = keyDecoder; - } - - /** - * Sets the value decoder. - * - * @param valDecoder Value decoder. - */ - public void setValueDecoder(Decoder<V> valDecoder) { - this.valDecoder = valDecoder; - } - - /** * Sets the retry timeout. * * @param retryTimeout Retry timeout. @@ -144,10 +119,10 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> { A.notNull(getStreamer(), "streamer"); A.notNull(getIgnite(), "ignite"); A.notNull(topic, "topic"); - A.notNull(keyDecoder, "key decoder"); - A.notNull(valDecoder, "value decoder"); A.notNull(consumerCfg, "kafka consumer config"); A.ensure(threads > 0, "threads > 0"); + A.ensure(null != getSingleTupleExtractor() || null != getMultipleTupleExtractor(), + "Extractor must be configured"); log = getIgnite().log(); @@ -157,10 +132,9 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> { topicCntMap.put(topic, threads); - Map<String, List<KafkaStream<K, V>>> consumerMap = - consumer.createMessageStreams(topicCntMap, keyDecoder, valDecoder); + Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCntMap); - List<KafkaStream<K, V>> streams = consumerMap.get(topic); + List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // Now launch all the consumer threads. executor = Executors.newFixedThreadPool(threads); @@ -168,16 +142,18 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> { stopped = false; // Now create an object to consume the messages. - for (final KafkaStream<K, V> stream : streams) { + for (final KafkaStream<byte[], byte[]> stream : streams) { executor.submit(new Runnable() { @Override public void run() { while (!stopped) { try { - for (ConsumerIterator<K, V> it = stream.iterator(); it.hasNext() && !stopped; ) { - MessageAndMetadata<K, V> msg = it.next(); + MessageAndMetadata<byte[], byte[]> msg; + + for (ConsumerIterator<byte[], byte[]> it = stream.iterator(); it.hasNext() && !stopped; ) { + msg = it.next(); try { - getStreamer().addData(msg.key(), msg.message()); + addMessage(msg); } catch (Exception e) { U.error(log, "Message is ignored due to an error [msg=" + msg + ']', e); @@ -224,4 +200,4 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> { } } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/dfb44ba2/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java index 4918f87..102b647 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java @@ -28,14 +28,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import kafka.consumer.ConsumerConfig; -import kafka.serializer.StringDecoder; -import kafka.utils.VerifiableProperties; +import kafka.message.MessageAndMetadata; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.events.CacheEvent; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.stream.StreamMultipleTupleExtractor; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.kafka.clients.producer.ProducerRecord; @@ -146,7 +146,7 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest { */ private void consumerStream(String topic, Map<String, String> keyValMap) throws TimeoutException, InterruptedException { - KafkaStreamer<String, String, String> kafkaStmr = null; + KafkaStreamer<String, String> kafkaStmr = null; Ignite ignite = grid(); @@ -173,13 +173,29 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest { kafkaStmr.setThreads(4); // Set the consumer configuration. - kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(embeddedBroker.getZookeeperAddress(), "groupX")); - - // Set the decoders. - StringDecoder strDecoder = new StringDecoder(new VerifiableProperties()); - - kafkaStmr.setKeyDecoder(strDecoder); - kafkaStmr.setValueDecoder(strDecoder); + kafkaStmr.setConsumerConfig( + createDefaultConsumerConfig(embeddedBroker.getZookeeperAddress(), "groupX")); + + kafkaStmr.setMultipleTupleExtractor( + new StreamMultipleTupleExtractor<MessageAndMetadata<byte[], byte[]>, String, String>() { + @Override public Map<String, String> extract(MessageAndMetadata<byte[], byte[]> msg) { + Map<String, String> entries = new HashMap<>(); + + try { + String key = new String(msg.key()); + String val = new String(msg.message()); + + // Convert the message into number of cache entries with same key or dynamic key from actual message. + // For now using key as cache entry key and value as cache entry value - for test purpose. + entries.put(key, val); + } + catch (Exception ex) { + fail("Unexpected error." + ex); + } + + return entries; + } + }); // Start kafka streamer. kafkaStmr.start();
