more review comments
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/190a3999 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/190a3999 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/190a3999 Branch: refs/heads/NewKafkaSystemConsumer Commit: 190a39990a9281511e07876636b6c8784337d35a Parents: 93ca950 Author: Boris S <[email protected]> Authored: Tue Sep 25 15:55:15 2018 -0700 Committer: Boris S <[email protected]> Committed: Tue Sep 25 15:55:15 2018 -0700 ---------------------------------------------------------------------- .../samza/config/KafkaConsumerConfig.java | 47 ++- .../samza/system/kafka/KafkaConsumerProxy.java | 345 +++++++++---------- .../samza/system/kafka/KafkaSystemConsumer.java | 49 ++- .../kafka/KafkaSystemConsumerMetrics.scala | 2 +- .../samza/config/TestKafkaConsumerConfig.java | 19 +- 5 files changed, 235 insertions(+), 227 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/190a3999/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java index 7d2408b..3fa66e5 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java @@ -31,6 +31,7 @@ import org.apache.samza.SamzaException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; +import scala.runtime.AbstractFunction0; /** @@ -40,9 +41,9 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class); - private static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer"; - private static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer"; - private static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin"; + static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer"; + static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer"; + static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin"; /* * By default, KafkaConsumer will fetch some big number of available messages for all the partitions. @@ -55,12 +56,12 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { } /** - * This is a help method to create the configs for use in Kafka consumer. + * Helper method to create configs for use in Kafka consumer. * The values are based on the "consumer" subset of the configs provided by the app and Samza overrides. * - * @param config - config provided by the app. - * @param systemName - system name for which the consumer is configured. - * @param clientId - client id to be used in the Kafka consumer. + * @param config config provided by the app. + * @param systemName system name to get the consumer configuration for. + * @param clientId client id to be used in the Kafka consumer. * @return KafkaConsumerConfig */ public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId) { @@ -85,7 +86,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))); - // make sure bootstrap configs are in, if not - get them from the producer + // if consumer bootstrap servers are not configured, get them from the producer configs if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { String bootstrapServers = config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); @@ -119,10 +120,19 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { // group id should be unique per job static String getConsumerGroupId(Config config) { JobConfig jobConfig = new JobConfig(config); - Option<String> jobIdOption = jobConfig.getJobId(); - Option<String> jobNameOption = jobConfig.getName(); - return (jobNameOption.isDefined() ? jobNameOption.get() : "undefined_job_name") + "-" + (jobIdOption.isDefined() - ? jobIdOption.get() : "undefined_job_id"); + Option jobNameOption = jobConfig.getName(); + if (jobNameOption.isEmpty()) { + throw new ConfigException("Missing job name"); + } + String jobName = (String) jobNameOption.get(); + + Option jobIdOption = jobConfig.getJobId(); + String jobId = "1"; + if (! jobIdOption.isEmpty()) { + jobId = (String) jobIdOption.get(); + } + + return String.format("%s-%s", jobName, jobId); } // client id should be unique per job @@ -139,11 +149,18 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { } static String getConsumerClientId(String id, Config config) { - if (config.get(JobConfig.JOB_NAME()) == null) { + JobConfig jobConfig = new JobConfig(config); + Option jobNameOption = jobConfig.getName(); + if (jobNameOption.isEmpty()) { throw new ConfigException("Missing job name"); } - String jobName = config.get(JobConfig.JOB_NAME()); - String jobId = (config.get(JobConfig.JOB_ID()) != null) ? config.get(JobConfig.JOB_ID()) : "1"; + String jobName = (String) jobNameOption.get(); + + Option jobIdOption = jobConfig.getJobId(); + String jobId = "1"; + if (! jobIdOption.isEmpty()) { + jobId = (String) jobIdOption.get(); + } return String.format("%s-%s-%s", id.replaceAll("\\W", "_"), jobName.replaceAll("\\W", "_"), jobId.replaceAll("\\W", "_")); http://git-wip-us.apache.org/repos/asf/samza/blob/190a3999/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index d2f7096..04071c1 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -30,13 +30,10 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import javax.print.DocFlavor; -import kafka.common.KafkaException; import kafka.common.TopicAndPartition; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; @@ -53,21 +50,21 @@ import org.slf4j.LoggerFactory; * This class is not thread safe. There will be only one instance of this class per KafkaSystemConsumer object. * We still need some synchronization around kafkaConsumer. See pollConsumer() method for details. */ -/*package private */class KafkaConsumerProxy<K, V> { +class KafkaConsumerProxy<K, V> { private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProxy.class); private static final int SLEEP_MS_WHILE_NO_TOPIC_PARTITION = 100; - /* package private */ final Thread consumerPollThread; + final Thread consumerPollThread; private final Consumer<K, V> kafkaConsumer; private final KafkaSystemConsumer.KafkaConsumerMessageSink sink; private final KafkaSystemConsumerMetrics kafkaConsumerMetrics; private final String metricName; private final String systemName; private final String clientId; - private final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>(); + private final Map<TopicPartition, SystemStreamPartition> topicPartitionToSSP = new HashMap<>(); private final Map<SystemStreamPartition, MetricName> perPartitionMetrics = new HashMap<>(); - // list of all the SSPs we poll from, with their next offsets correspondingly. + // list of all the SSPs we poll from, with their next(most recently read + 1) offsets correspondingly. private final Map<SystemStreamPartition, Long> nextOffsets = new ConcurrentHashMap<>(); // lags behind the high water mark, as reported by the Kafka consumer. private final Map<SystemStreamPartition, Long> latestLags = new HashMap<>(); @@ -76,7 +73,6 @@ import org.slf4j.LoggerFactory; private volatile Throwable failureCause = null; private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1); - // package private constructor KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId, KafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics, String metricName) { @@ -96,14 +92,46 @@ import org.slf4j.LoggerFactory; "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName); } - @Override - public String toString() { - return String.format("consumerProxy-%s-%s", systemName, clientId); + /** + * Add new partition to the list of polled partitions. + * Bust only be called before {@link KafkaConsumerProxy#start} is called.. + */ + public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) { + LOG.info(String.format("Adding new topicPartition %s with offset %s to queue for consumer %s", ssp, nextOffset, + this)); + topicPartitionToSSP.put(KafkaSystemConsumer.toTopicPartition(ssp), ssp); //registered SSPs + + // this is already vetted offset so there is no need to validate it + nextOffsets.put(ssp, nextOffset); + + kafkaConsumerMetrics.setNumTopicPartitions(metricName, nextOffsets.size()); + } + + /** + * Stop this KafkaConsumerProxy and wait for at most {@code timeoutMs}. + * @param timeoutMs maximum time to wait to stop this KafkaConsumerProxy + */ + public void stop(long timeoutMs) { + LOG.info("Shutting down KafkaConsumerProxy poll thread {} for {}", consumerPollThread.getName(), this); + + isRunning = false; + try { + consumerPollThread.join(timeoutMs/2); + // join() may timeout + // in this case we should interrupt it and wait again + if (consumerPollThread.isAlive()) { + consumerPollThread.interrupt(); + consumerPollThread.join(timeoutMs/2); + } + } catch (InterruptedException e) { + LOG.warn("Join in KafkaConsumerProxy has failed", e); + consumerPollThread.interrupt(); + } } public void start() { if (!consumerPollThread.isAlive()) { - LOG.info("Starting KafkaConsumerProxy polling thread for system " + systemName + " " + this.toString()); + LOG.info("Starting KafkaConsumerProxy polling thread for " + this.toString()); consumerPollThread.start(); @@ -112,70 +140,124 @@ import org.slf4j.LoggerFactory; try { consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - LOG.info("Got InterruptedException", e); + LOG.info("Ignoring InterruptedException while waiting for consumer poll thread to start.", e); } } } else { LOG.warn("Tried to start an already started KafkaConsumerProxy (%s). Ignoring.", this.toString()); } - if (topicPartitions2SSP.size() == 0) { - String msg = String.format("Cannot start empty set of TopicPartitions for system %s, clientid %s", - systemName, clientId); + if (topicPartitionToSSP.size() == 0) { + String msg = String.format("Cannot start KafkaConsumerProxy without any registered TopicPartitions for %s", systemName); LOG.error(msg); throw new SamzaException(msg); } } - /** - * Stop the thread and wait for it to stop. - * @param timeoutMs how long to wait in join - */ - public void stop(long timeoutMs) { - LOG.info("Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName()); + boolean isRunning() { + return isRunning; + } - isRunning = false; - try { - consumerPollThread.join(timeoutMs); - // join returns event if the thread didn't finish - // in this case we should interrupt it and wait again - if (consumerPollThread.isAlive()) { - consumerPollThread.interrupt(); - consumerPollThread.join(timeoutMs); + Throwable getFailureCause() { + return failureCause; + } + + private void initializeLags() { + // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag. + Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitionToSSP.keySet()); + endOffsets.forEach((tp, offset) -> { + SystemStreamPartition ssp = topicPartitionToSSP.get(tp); + long startingOffset = nextOffsets.get(ssp); + // End offsets are the offset of the newest message + 1 + // If the message we are about to consume is < end offset, we are starting with a lag. + long initialLag = endOffsets.get(tp) - startingOffset; + + LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset); + latestLags.put(ssp, initialLag); + sink.setIsAtHighWatermark(ssp, initialLag == 0); + }); + + // initialize lag metrics + refreshLagMetrics(); + } + + // creates a separate thread for getting the messages. + private Runnable createProxyThreadRunnable() { + Runnable runnable = () -> { + isRunning = true; + + try { + consumerPollThreadStartLatch.countDown(); + LOG.info("Starting consumer poll thread {} for system {}", consumerPollThread.getName(), systemName); + initializeLags(); + while (isRunning) { + fetchMessages(); + } + } catch (Throwable throwable) { + LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable); + // KafkaSystemConsumer uses the failureCause to propagate the throwable to the container + failureCause = throwable; + isRunning = false; } - } catch (InterruptedException e) { - LOG.warn("Join in KafkaConsumerProxy has failed", e); - consumerPollThread.interrupt(); - } + + if (!isRunning) { + LOG.info("KafkaConsumerProxy for system {} has stopped.", systemName); + } + }; + + return runnable; } - /** - * Add new partition to the list of polled partitions. - * This method should be called only at the beginning, before the thread is started. - */ - public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) { - LOG.info(String.format("Adding new topic and partition %s, offset = %s to queue for consumer %s", ssp, nextOffset, - this)); - topicPartitions2SSP.put(KafkaSystemConsumer.toTopicPartition(ssp), ssp); //registered SSPs + private void fetchMessages() { + Set<SystemStreamPartition> sspsToFetch = new HashSet<>(); + for (SystemStreamPartition ssp : nextOffsets.keySet()) { + if (sink.needsMoreMessages(ssp)) { + sspsToFetch.add(ssp); + } + } + LOG.debug("pollConsumer for {} SSPs: {}", sspsToFetch.size(), sspsToFetch); + if (!sspsToFetch.isEmpty()) { + kafkaConsumerMetrics.incClientReads(metricName); - // this is already vetted offset so there is no need to validate it - LOG.info(String.format("Got offset %s for new topic and partition %s.", nextOffset, ssp)); + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response; - nextOffsets.put(ssp, nextOffset); + response = pollConsumer(sspsToFetch, 500L); + + // move the responses into the queue + for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) { + List<IncomingMessageEnvelope> envelopes = e.getValue(); + if (envelopes != null) { + moveMessagesToTheirQueue(e.getKey(), envelopes); + } + } + + populateCurrentLags(sspsToFetch); // find current lags for for each SSP + } else { // nothing to read + + LOG.debug("No topic/partitions need to be fetched for system {} right now. Sleeping {}ms.", systemName, + SLEEP_MS_WHILE_NO_TOPIC_PARTITION); + + kafkaConsumerMetrics.incClientSkippedFetchRequests(metricName); - kafkaConsumerMetrics.setTopicPartitionValue(metricName, nextOffsets.size()); + try { + Thread.sleep(SLEEP_MS_WHILE_NO_TOPIC_PARTITION); + } catch (InterruptedException e) { + LOG.warn("Sleep in fetchMessages was interrupted"); + } + } + refreshLagMetrics(); } // the actual polling of the messages from kafka private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollConsumer( - Set<SystemStreamPartition> systemStreamPartitions, long timeout) { + Set<SystemStreamPartition> systemStreamPartitions, long timeoutMs) { // Since we need to poll only from some subset of TopicPartitions (passed as the argument), // we need to pause the rest. List<TopicPartition> topicPartitionsToPause = new ArrayList<>(); List<TopicPartition> topicPartitionsToPoll = new ArrayList<>(); - for (Map.Entry<TopicPartition, SystemStreamPartition> e : topicPartitions2SSP.entrySet()) { + for (Map.Entry<TopicPartition, SystemStreamPartition> e : topicPartitionToSSP.entrySet()) { TopicPartition tp = e.getKey(); SystemStreamPartition ssp = e.getValue(); if (systemStreamPartitions.contains(ssp)) { @@ -186,21 +268,18 @@ import org.slf4j.LoggerFactory; } ConsumerRecords<K, V> records; - try { // Synchronize, in case the consumer is used in some other thread (metadata or something else) synchronized (kafkaConsumer) { // Since we are not polling from ALL the subscribed topics, so we need to "change" the subscription temporarily kafkaConsumer.pause(topicPartitionsToPause); kafkaConsumer.resume(topicPartitionsToPoll); - records = kafkaConsumer.poll(timeout); - // resume original set of subscription - may be required for checkpointing - kafkaConsumer.resume(topicPartitionsToPause); + records = kafkaConsumer.poll(timeoutMs); } } catch (Exception e) { // we may get InvalidOffsetException | AuthorizationException | KafkaException exceptions, // but we still just rethrow, and log it up the stack. - LOG.error("Caught a Kafka exception in pollConsumer", e); + LOG.error("Caught a Kafka exception in pollConsumer for system " + systemName, e); throw e; } @@ -209,12 +288,11 @@ import org.slf4j.LoggerFactory; private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> processResults(ConsumerRecords<K, V> records) { if (records == null) { - throw new SamzaException("ERROR:records is null, after pollConsumer call (in processResults)"); + throw new SamzaException("Received null 'records' after polling consumer in KafkaConsumerProxy " + this); } Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new HashMap<>(records.count()); // Parse the returned records and convert them into the IncomingMessageEnvelope. - // Note. They have been already de-serialized by the consumer. for (ConsumerRecord<K, V> record : records) { int partition = record.partition(); String topic = record.topic(); @@ -222,18 +300,18 @@ import org.slf4j.LoggerFactory; updateMetrics(record, tp); - SystemStreamPartition ssp = topicPartitions2SSP.get(tp); - List<IncomingMessageEnvelope> listMsgs = results.get(ssp); - if (listMsgs == null) { - listMsgs = new ArrayList<>(); - results.put(ssp, listMsgs); + SystemStreamPartition ssp = topicPartitionToSSP.get(tp); + List<IncomingMessageEnvelope> messages = results.get(ssp); + if (messages == null) { + messages = new ArrayList<>(); + results.put(ssp, messages); } - final K key = record.key(); - final Object value = record.value(); - final IncomingMessageEnvelope imEnvelope = + K key = record.key(); + Object value = record.value(); + IncomingMessageEnvelope imEnvelope = new IncomingMessageEnvelope(ssp, String.valueOf(record.offset()), key, value, getRecordSize(record)); - listMsgs.add(imEnvelope); + messages.add(imEnvelope); } if (LOG.isDebugEnabled()) { LOG.debug("# records per SSP:"); @@ -246,52 +324,6 @@ import org.slf4j.LoggerFactory; return results; } - // creates a separate thread for getting the messages. - private Runnable createProxyThreadRunnable() { - Runnable runnable= () -> { - isRunning = true; - - try { - consumerPollThreadStartLatch.countDown(); - LOG.info("Starting runnable " + consumerPollThread.getName()); - initializeLags(); - while (isRunning) { - fetchMessages(); - } - } catch (Throwable throwable) { - LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable); - // KafkaSystemConsumer uses the failureCause to propagate the throwable to the container - failureCause = throwable; - isRunning = false; - } - - if (!isRunning) { - LOG.info("KafkaConsumerProxy for system {} has stopped.", systemName); - } - }; - - return runnable; - } - - private void initializeLags() { - // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag. - Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions2SSP.keySet()); - endOffsets.forEach((tp, offset) -> { - SystemStreamPartition ssp = topicPartitions2SSP.get(tp); - long startingOffset = nextOffsets.get(ssp); - // End offsets are the offset of the newest message + 1 - // If the message we are about to consume is < end offset, we are starting with a lag. - long initialLag = endOffsets.get(tp) - startingOffset; - - LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset); - latestLags.put(ssp, initialLag); - sink.setIsAtHighWatermark(ssp, initialLag == 0); - }); - - // initialize lag metrics - refreshLatencyMetrics(); - } - private int getRecordSize(ConsumerRecord<K, V> r) { int keySize = (r.key() == null) ? 0 : r.serializedKeySize(); return keySize + r.serializedValueSize(); @@ -300,10 +332,16 @@ import org.slf4j.LoggerFactory; private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) { TopicAndPartition tap = KafkaSystemConsumer.toTopicAndPartition(tp); SystemStreamPartition ssp = new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition())); - long currentSSPLag = getLatestLag(ssp); // lag between the current offset and the highwatermark + + Long lag = latestLags.get(ssp); + if (lag == null) { + throw new SamzaException("Unknown/unregistered ssp in latestLags. ssp=" + ssp + "; system=" + systemName); + } + long currentSSPLag = lag.longValue(); // lag between the current offset and the highwatermark if (currentSSPLag < 0) { return; } + long recordOffset = r.offset(); long highWatermark = recordOffset + currentSSPLag; // derived value for the highwatermark @@ -315,7 +353,6 @@ import org.slf4j.LoggerFactory; kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark); } - private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List<IncomingMessageEnvelope> envelopes) { long nextOffset = nextOffsets.get(ssp); @@ -329,16 +366,6 @@ import org.slf4j.LoggerFactory; nextOffsets.put(ssp, nextOffset); } - private void populateMetricNames(Set<SystemStreamPartition> ssps) { - HashMap<String, String> tags = new HashMap<>(); - tags.put("client-id", clientId);// this is required by the KafkaConsumer to get the metrics - - for (SystemStreamPartition ssp : ssps) { - TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp); - perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)); - } - } - // The only way to figure out lag for the KafkaConsumer is to look at the metrics after each poll() call. // One of the metrics (records-lag) shows how far behind the HighWatermark the consumer is. // This method populates the lag information for each SSP into latestLags member variable. @@ -348,17 +375,23 @@ import org.slf4j.LoggerFactory; // populate the MetricNames first time if (perPartitionMetrics.isEmpty()) { - populateMetricNames(ssps); + HashMap<String, String> tags = new HashMap<>(); + tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics + + for (SystemStreamPartition ssp : ssps) { + TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp); + perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)); + } } for (SystemStreamPartition ssp : ssps) { MetricName mn = perPartitionMetrics.get(ssp); - Metric currentLagM = consumerMetrics.get(mn); + Metric currentLagMetric = consumerMetrics.get(mn); // High watermark is fixed to be the offset of last available message, // so the lag is now at least 0, which is the same as Samza's definition. // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling. - long currentLag = (currentLagM != null) ? (long) currentLagM.value() : -1L; + long currentLag = (currentLagMetric != null) ? (long) currentLagMetric.value() : -1L; latestLags.put(ssp, currentLag); // calls the setIsAtHead for the BlockingEnvelopeMap @@ -366,58 +399,7 @@ import org.slf4j.LoggerFactory; } } - // Get the latest lag for a specific SSP. - private long getLatestLag(SystemStreamPartition ssp) { - Long lag = latestLags.get(ssp); - if (lag == null) { - throw new SamzaException("Unknown/unregistered ssp in latestLags request: " + ssp); - } - return lag; - } - - // Using the consumer to poll the messages from the stream. - private void fetchMessages() { - Set<SystemStreamPartition> sspsToFetch = new HashSet<>(); - for (SystemStreamPartition ssp : nextOffsets.keySet()) { - if (sink.needsMoreMessages(ssp)) { - sspsToFetch.add(ssp); - } - } - LOG.debug("pollConsumer {}", sspsToFetch.size()); - if (!sspsToFetch.isEmpty()) { - kafkaConsumerMetrics.incClientReads(metricName); - - Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response; - LOG.debug("pollConsumer from following SSPs: {}; total#={}", sspsToFetch, sspsToFetch.size()); - - response = pollConsumer(sspsToFetch, 500L); - - // move the responses into the queue - for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) { - List<IncomingMessageEnvelope> envelopes = e.getValue(); - if (envelopes != null) { - moveMessagesToTheirQueue(e.getKey(), envelopes); - } - } - - populateCurrentLags(sspsToFetch); // find current lags for for each SSP - } else { // nothing to read - - LOG.debug("No topic/partitions need to be fetched for consumer {} right now. Sleeping {}ms.", kafkaConsumer, - SLEEP_MS_WHILE_NO_TOPIC_PARTITION); - - kafkaConsumerMetrics.incClientSkippedFetchRequests(metricName); - - try { - Thread.sleep(SLEEP_MS_WHILE_NO_TOPIC_PARTITION); - } catch (InterruptedException e) { - LOG.warn("Sleep in fetchMessages was interrupted"); - } - } - refreshLatencyMetrics(); - } - - private void refreshLatencyMetrics() { + private void refreshLagMetrics() { for (Map.Entry<SystemStreamPartition, Long> e : nextOffsets.entrySet()) { SystemStreamPartition ssp = e.getKey(); Long offset = e.getValue(); @@ -433,12 +415,9 @@ import org.slf4j.LoggerFactory; } } - boolean isRunning() { - return isRunning; - } - - Throwable getFailureCause() { - return failureCause; + @Override + public String toString() { + return String.format("consumerProxy-%s-%s", systemName, clientId); } } http://git-wip-us.apache.org/repos/asf/samza/blob/190a3999/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java index 17f29f1..10ce274 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java @@ -66,10 +66,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy final KafkaConsumerMessageSink messageSink; // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates - // BlockeingEnvelopMap's buffers. + // BlockingEnvelopMap's buffers. final private KafkaConsumerProxy proxy; - // keep registration data until the start - mapping between registered SSPs and topicPartitions, and the offsets + // keep registration data until the start - mapping between registered SSPs and topicPartitions, and their offsets final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>(); final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>(); @@ -77,10 +77,11 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy long perPartitionFetchThresholdBytes; /** + * Create a KafkaSystemConsumer for the provided {@code systemName} * @param systemName system name for which we create the consumer - * @param config config passed into the the app - * @param metrics metrics collecting object - * @param clock - system clock, allows to override internal clock (System.currentTimeMillis()) + * @param config application config + * @param metrics metrics for this KafkaSystemConsumer + * @param clock system clock */ public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) { @@ -99,11 +100,9 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy messageSink = new KafkaConsumerMessageSink(); // Create the proxy to do the actual message reading. - String metricName = String.format("%s %s", systemName, clientId); + String metricName = String.format("%s", systemName); proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, metrics, metricName); LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy); - - LOG.info("{}: Created KafkaSystemConsumer {}", this, kafkaConsumer); } /** @@ -118,7 +117,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy // extract kafka client configs KafkaConsumerConfig consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId); - LOG.info("{}:{} KafkaClient properties {}", systemName, clientId, consumerConfig); + LOG.info("{}: KafkaClient properties {}", systemName, consumerConfig); return new KafkaConsumer(consumerConfig); } @@ -130,7 +129,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy return; } if (stopped.get()) { - LOG.warn("{}: Attempting to start a stopped consumer", this); + LOG.error("{}: Attempting to start a stopped consumer", this); return; } // initialize the subscriptions for all the registered TopicPartitions @@ -151,8 +150,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy kafkaConsumer.assign(topicPartitionsToSSP.keySet()); } } catch (Exception e) { - LOG.warn("{}: Start subscription failed", this); - throw new SamzaException(e); + throw new SamzaException("Consumer subscription failed for " + this, e); } } @@ -164,7 +162,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy void startConsumer() { // set the offset for each TopicPartition if (topicPartitionsToOffset.size() <= 0) { - LOG.warn("{}: Consumer is not subscribed to any SSPs", this); + LOG.error ("{}: Consumer is not subscribed to any SSPs", this); } topicPartitionsToOffset.forEach((tp, startingOffsetString) -> { @@ -204,35 +202,30 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy long fetchThreshold = FETCH_THRESHOLD; if (fetchThresholdOption.isDefined()) { fetchThreshold = Long.valueOf(fetchThresholdOption.get()); - LOG.info("{}: fetchThresholdOption is configured. fetchThreshold={}", this, fetchThreshold); } Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName); long fetchThresholdBytes = FETCH_THRESHOLD_BYTES; if (fetchThresholdBytesOption.isDefined()) { fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get()); - LOG.info("{}: fetchThresholdBytesOption is configured. fetchThresholdBytes={}", this, fetchThresholdBytes); } - int numTPs = topicPartitionsToSSP.size(); - if (numTPs != topicPartitionsToOffset.size()) { + int numPartitions = topicPartitionsToSSP.size(); + if (numPartitions != topicPartitionsToOffset.size()) { throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()"); } - LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; partitions num={}", this, fetchThresholdBytes, - fetchThreshold, numTPs); - if (numTPs > 0) { - perPartitionFetchThreshold = fetchThreshold / numTPs; - LOG.info("{}: perPartitionFetchThreshold={}", this, perPartitionFetchThreshold); + if (numPartitions > 0) { + perPartitionFetchThreshold = fetchThreshold / numPartitions; if (fetchThresholdBytesEnabled) { // currently this feature cannot be enabled, because we do not have the size of the messages available. // messages get double buffered, hence divide by 2 - perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numTPs; - LOG.info("{} :perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes={}", this, - perPartitionFetchThresholdBytes); + perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numPartitions; } } + LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; numPartitions={}, perPartitionFetchThreshold={}, perPartitionFetchThresholdBytes(0 if disabled)={}", + this, fetchThresholdBytes, fetchThreshold, numPartitions, perPartitionFetchThreshold, perPartitionFetchThresholdBytes); } @Override @@ -260,8 +253,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy } } - /* - record the ssp and the offset. Do not submit it to the consumer yet. + /** + * record the ssp and the offset. Do not submit it to the consumer yet. + * @param systemStreamPartition ssp to register + * @param offset offset to register with */ @Override public void register(SystemStreamPartition systemStreamPartition, String offset) { http://git-wip-us.apache.org/repos/asf/samza/blob/190a3999/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala index c4552e6..59a8854 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala @@ -55,7 +55,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr // java friendlier interfaces // Gauges - def setTopicPartitionValue(clientName: String, value: Int) { + def setNumTopicPartitions(clientName: String, value: Int) { topicPartitions.get(clientName).set(value) } http://git-wip-us.apache.org/repos/asf/samza/blob/190a3999/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java index 35a717a..de5d093 100644 --- a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java +++ b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java @@ -28,7 +28,7 @@ import org.junit.Test; public class TestKafkaConsumerConfig { - private final Map<String, String> props = new HashMap<>(); + public final static String SYSTEM_NAME = "testSystem"; public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer."; public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer."; @@ -36,6 +36,7 @@ public class TestKafkaConsumerConfig { @Test public void testDefaults() { + Map<String, String> props = new HashMap<>(); props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, @@ -43,6 +44,8 @@ public class TestKafkaConsumerConfig { props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // should NOT be ignored + props.put(JobConfig.JOB_NAME(), "jobName"); + // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092"); props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092"); @@ -72,11 +75,23 @@ public class TestKafkaConsumerConfig { Assert.assertEquals(KafkaConsumerConfig.getConsumerGroupId(config), kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG)); + + Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-1", + KafkaConsumerConfig.getConsumerClientId(config)); + Assert.assertEquals("jobName-1", KafkaConsumerConfig.getConsumerGroupId(config)); + + props.put(JobConfig.JOB_ID(), "jobId"); + config = new MapConfig(props); + + Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-jobId", + KafkaConsumerConfig.getConsumerClientId(config)); + Assert.assertEquals("jobName-jobId", KafkaConsumerConfig.getConsumerGroupId(config)); } // test stuff that should not be overridden @Test public void testNotOverride() { + Map<String, String> props = new HashMap<>(); // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092"); @@ -85,6 +100,8 @@ public class TestKafkaConsumerConfig { props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName()); + props.put(JobConfig.JOB_NAME(), "jobName"); + Config config = new MapConfig(props); KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID);
