added new samza kafka system consumer using new kafka consumer
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c0ea25cb Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c0ea25cb Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c0ea25cb Branch: refs/heads/NewKafkaSystemConsumer Commit: c0ea25cbc674a1d67546f7f47a6f36f6ee58bdc6 Parents: 7254460 Author: Boris S <[email protected]> Authored: Wed Aug 29 10:52:30 2018 -0700 Committer: Boris S <[email protected]> Committed: Wed Aug 29 10:52:30 2018 -0700 ---------------------------------------------------------------------- .../clients/consumer/KafkaConsumerConfig.java | 15 ++- .../samza/system/kafka/KafkaConsumerProxy.java | 7 +- .../samza/system/kafka/KafkaSystemFactory.scala | 59 +----------- .../system/kafka/NewKafkaSystemConsumer.java | 97 +++++++++++++------- 4 files changed, 80 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c0ea25cb/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java index 97360e2..b29a041 100644 --- a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java +++ b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java @@ -40,6 +40,7 @@ public class KafkaConsumerConfig extends ConsumerConfig { private static final String SAMZA_OFFSET_SMALLEST = "smallest"; private static final String KAFKA_OFFSET_LATEST = "latest"; private static final String KAFKA_OFFSET_EARLIEST = "earliest"; + private static final String KAFKA_OFFSET_NONE = "none"; /* * By default, KafkaConsumer will fetch ALL available messages for all the partitions. * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll(). @@ -64,16 +65,14 @@ public class KafkaConsumerConfig extends ConsumerConfig { consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId); - /******************************************** - * Open-source Kafka Consumer configuration * - *******************************************/ + //Open-source Kafka Consumer configuration consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Disable consumer auto-commit consumerProps.setProperty( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetResetValue(consumerProps)); // Translate samza config value to kafka config value - // makesure bootstrap configs are in ?? SHOULD WE FAIL IF THEY ARE NOT? + // make sure bootstrap configs are in ?? SHOULD WE FAIL IF THEY ARE NOT? if (! subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { // get it from the producer config String bootstrapServer = config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); @@ -139,6 +138,14 @@ public class KafkaConsumerConfig extends ConsumerConfig { */ static String getAutoOffsetResetValue(Properties properties) { String autoOffsetReset = properties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KAFKA_OFFSET_LATEST); + + // accept kafka values directly + if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || + autoOffsetReset.equals(KAFKA_OFFSET_LATEST) || + autoOffsetReset.equals(KAFKA_OFFSET_NONE)) { + return autoOffsetReset; + } + switch (autoOffsetReset) { case SAMZA_OFFSET_LARGEST: return KAFKA_OFFSET_LATEST; http://git-wip-us.apache.org/repos/asf/samza/blob/c0ea25cb/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index 66971af..01b345a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -69,7 +69,6 @@ public class KafkaConsumerProxy<K, V> { private final Map<SystemStreamPartition, Long> nextOffsets = new ConcurrentHashMap<>(); // lags behind the high water mark, as reported by the Kafka consumer. private final Map<SystemStreamPartition, Long> latestLags = new HashMap<>(); - private final NewKafkaSystemConsumer.ValueUnwrapper<V> valueUnwrapper; private volatile boolean isRunning = false; private volatile Throwable failureCause = null; @@ -77,7 +76,7 @@ public class KafkaConsumerProxy<K, V> { public KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId, NewKafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics, - String metricName, NewKafkaSystemConsumer.ValueUnwrapper<V> valueUnwrapper) { + String metricName) { this.kafkaConsumer = kafkaConsumer; this.systemName = systemName; @@ -85,7 +84,6 @@ public class KafkaConsumerProxy<K, V> { this.kafkaConsumerMetrics = samzaConsumerMetrics; this.metricName = metricName; this.clientId = clientId; - this.valueUnwrapper = valueUnwrapper; // TODO - see if we need new metrics (not host:port based) this.kafkaConsumerMetrics.registerBrokerProxy(metricName, 0); @@ -257,8 +255,7 @@ public class KafkaConsumerProxy<K, V> { //} final K key = r.key(); - final Object value = - valueUnwrapper == null ? r.value() : valueUnwrapper.unwrapValue(ssp.getSystemStream(), r.value()); + final Object value = r.value(); IncomingMessageEnvelope imEnvelope = new IncomingMessageEnvelope(ssp, String.valueOf(r.offset()), key, value, msgSize); listMsgs.add(imEnvelope); http://git-wip-us.apache.org/repos/asf/samza/blob/c0ea25cb/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index c7f6aed..6a5eda9 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -55,64 +55,9 @@ class KafkaSystemFactory extends SystemFactory with Logging { val clientId = KafkaUtil.getClientId("samza-consumer", config) val metrics = new KafkaSystemConsumerMetrics(systemName, registry) - // Kind of goofy to need a producer config for consumers, but we need metadata. - val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId) - val bootstrapServers = producerConfig.bootsrapServers - //val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) - - //val kafkaConfig = new KafkaConfig(config) - - - // val timeout = consumerConfig.socketTimeoutMs - //val bufferSize = consumerConfig.socketReceiveBufferBytes - //val fetchSize = new StreamFetchSizes(consumerConfig.fetchMessageMaxBytes, config.getFetchMessageMaxBytesTopics(systemName)) - //val consumerMinSize = consumerConfig.fetchMinBytes - //val consumerMaxWait = consumerConfig.fetchWaitMaxMs - //val autoOffsetResetDefault = consumerConfig.autoOffsetReset - val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName) - val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt - val fetchThresholdBytes = config.getConsumerFetchThresholdBytes(systemName).getOrElse("-1").toLong - //val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics) - //val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, timeout) - - - val kafkaConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = - NewKafkaSystemConsumer.getKafkaConsumerImpl(systemName, clientId, config) - - def valueUnwrapper: NewKafkaSystemConsumer.ValueUnwrapper[Array[Byte]] = null;// TODO add real unrapper from - val kc = new NewKafkaSystemConsumer ( - kafkaConsumer, systemName, config, clientId, - metrics, new SystemClock, false, valueUnwrapper) - - kc - /* - new KafkaSystemConsumer( - systemName = systemName, - systemAdmin = getAdmin(systemName, config), - metrics = metrics, - metadataStore = metadataStore, - clientId = clientId, - timeout = timeout, - bufferSize = bufferSize, - fetchSize = fetchSize, - consumerMinSize = consumerMinSize, - consumerMaxWait = consumerMaxWait, - fetchThreshold = fetchThreshold, - fetchThresholdBytes = fetchThresholdBytes, - fetchLimitByBytesEnabled = config.isConsumerFetchThresholdBytesEnabled(systemName), - offsetGetter = offsetGetter) - */ - } - - /* - def getKafkaConsumerImpl(systemName: String, config: KafkaConfig) = { - info("Consumer properties in getKafkaConsumerImpl: systemName: {}, consumerProperties: {}", systemName, config) - - val byteArrayDeserializer = new ByteArrayDeserializer - new KafkaConsumer[Array[Byte], Array[Byte]](config.configForVanillaConsumer(), - byteArrayDeserializer, byteArrayDeserializer) + NewKafkaSystemConsumer.getNewKafkaSystemConsumer( + systemName, config, clientId, metrics, new SystemClock) } - */ def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = { val clientId = KafkaUtil.getClientId("samza-producer", config) http://git-wip-us.apache.org/repos/asf/samza/blob/c0ea25cb/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java index 26db610..dd7e584 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java @@ -21,6 +21,7 @@ package org.apache.samza.system.kafka; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,29 +41,24 @@ import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.KafkaConfig; +import org.apache.samza.config.StreamConfig; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemConsumer; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.util.BlockingEnvelopeMap; import org.apache.samza.util.Clock; +import org.apache.samza.util.KafkaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; +import scala.collection.JavaConversions; public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements SystemConsumer{ private static final Logger LOG = LoggerFactory.getLogger(NewKafkaSystemConsumer.class); - /** - * Provides a way to unwrap the value further. It is used for intermediate stream messages. - * @param <T> value type - */ - public interface ValueUnwrapper<T> { - Object unwrapValue(SystemStream systemStream, T value); - } - private static final long FETCH_THRESHOLD = 50000; private static final long FETCH_THRESHOLD_BYTES = -1L; private final Consumer<K,V> kafkaConsumer; @@ -75,7 +71,6 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements private final AtomicBoolean started = new AtomicBoolean(false); private final Config config; private final boolean fetchThresholdBytesEnabled; - private final ValueUnwrapper<V> valueUnwrapper; // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap. private KafkaConsumerMessageSink messageSink; @@ -99,9 +94,7 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements Config config, String clientId, KafkaSystemConsumerMetrics metrics, - Clock clock, - boolean fetchThresholdBytesEnabled, - ValueUnwrapper<V> valueUnwrapper) { + Clock clock) { super(metrics.registry(),clock, metrics.getClass().getName()); @@ -109,41 +102,64 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements this.clientId = clientId; this.systemName = systemName; this.config = config; - this.fetchThresholdBytesEnabled = fetchThresholdBytesEnabled; this.metricName = systemName + " " + clientId; this.kafkaConsumer = kafkaConsumer; - this.valueUnwrapper = valueUnwrapper; + + this.fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName); LOG.info(String.format( "Created SamzaLiKafkaSystemConsumer for system=%s, clientId=%s, metricName=%s with liKafkaConsumer=%s", systemName, clientId, metricName, this.kafkaConsumer.toString())); } - public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String systemName, String clientId, Config config) { + public static <K, V> NewKafkaSystemConsumer getNewKafkaSystemConsumer( + String systemName, + Config config, + String clientId, + KafkaSystemConsumerMetrics metrics, + Clock clock) { + + // extract consumer configs and create kafka consumer + KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, clientId, config); + + return new NewKafkaSystemConsumer(kafkaConsumer, + systemName, + config, + clientId, + metrics, + clock); + } + + /** + * create kafka consumer + * @param systemName + * @param clientId + * @param config + * @return kafka consumer + */ + private static <K, V> KafkaConsumer<K, V> getKafkaConsumerImpl(String systemName, String clientId, Config config) { Map<String, String> injectProps = new HashMap<>(); - injectProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - injectProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should + // default to byte[] + if ( !config.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { + LOG.info("default key serialization for the consumer(for {}) to ByteArrayDeserializer", systemName); + injectProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + } + if ( !config.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { + LOG.info("default value serialization for the consumer(for {}) to ByteArrayDeserializer", systemName); + injectProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + } + + // extract kafka consumer configs KafkaConsumerConfig consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId, injectProps); LOG.info("==============>Consumer properties in getKafkaConsumerImpl: systemName: {}, consumerProperties: {}", systemName, consumerConfig.originals()); - /* - Map<String, Object> kafkaConsumerConfig = consumerConfig.originals().entrySet().stream() - .collect(Collectors.toMap((kv)->kv.getKey(), (kv)->(Object)kv.getValue())); -*/ - - return new KafkaConsumer<byte[], byte[]>(consumerConfig.originals()); - } - /** - * return system name for this consumer - * @return system name - */ - public String getSystemName() { - return systemName; + return new KafkaConsumer<>(consumerConfig.originals()); } @Override @@ -156,7 +172,7 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements LOG.warn("attempting to start a stopped consumer"); return; } -LOG.info("==============>About to start consumer"); + LOG.info("==============>About to start consumer"); // initialize the subscriptions for all the registered TopicPartitions startSubscription(); LOG.info("==============>subscription started"); @@ -193,7 +209,7 @@ LOG.info("==============>About to start consumer"); // create the thread with the consumer proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, - samzaConsumerMetrics, metricName, valueUnwrapper); + samzaConsumerMetrics, metricName); LOG.info("==============>Created consumer proxy: " + proxy); } @@ -363,6 +379,23 @@ LOG.info("==============>About to start consumer"); return new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition())); } + /** + * return system name for this consumer + * @return system name + */ + public String getSystemName() { + return systemName; + } + + private static Set<SystemStream> getIntermediateStreams(Config config) { + StreamConfig streamConfig = new StreamConfig(config); + Collection<String> streamIds = JavaConversions.asJavaCollection(streamConfig.getStreamIds()); + return streamIds.stream() + .filter(streamConfig::getIsIntermediateStream) + .map(id -> streamConfig.streamIdToSystemStream(id)) + .collect(Collectors.toSet()); + } + //////////////////////////////////// // inner class for the message sink ////////////////////////////////////
