review comments
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5397a34e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5397a34e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5397a34e Branch: refs/heads/NewKafkaSystemConsumer Commit: 5397a34e2a6a5df0d7ae088ec2b309e65b53b4e7 Parents: 1d1fb89 Author: Boris S <[email protected]> Authored: Mon Sep 24 10:54:27 2018 -0700 Committer: Boris S <[email protected]> Committed: Mon Sep 24 10:54:27 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/container/SamzaContainer.scala | 2 +- .../clients/consumer/KafkaConsumerConfig.java | 194 ---------------- .../samza/config/KafkaConsumerConfig.java | 198 +++++++++++++++++ .../samza/system/kafka/KafkaConsumerProxy.java | 220 +++++++++---------- .../samza/system/kafka/KafkaSystemConsumer.java | 187 ++++++++-------- .../kafka/KafkaSystemConsumerMetrics.scala | 2 +- .../samza/system/kafka/KafkaSystemFactory.scala | 4 +- .../consumer/TestKafkaConsumerConfig.java | 137 ------------ .../samza/config/TestKafkaConsumerConfig.java | 152 +++++++++++++ .../system/kafka/TestKafkaSystemConsumer.java | 12 +- 10 files changed, 552 insertions(+), 556 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index e71fcb3..fba7329 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -822,7 +822,7 @@ class SamzaContainer( } try { - info("Shutting down SamzaContaier.") + info("Shutting down SamzaContainer.") removeShutdownHook jmxServer.stop http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java deleted file mode 100644 index 8ada1b4..0000000 --- a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.kafka.clients.consumer; - -import java.util.Map; -import java.util.Properties; -import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.config.ConfigException; -import org.apache.samza.config.JobConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Option; - - -/** - * The configuration class for KafkaConsumer - */ -public class KafkaConsumerConfig extends ConsumerConfig { - - public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class); - - private static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer"; - private static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer"; - private static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin"; - private static final String SAMZA_OFFSET_LARGEST = "largest"; - private static final String SAMZA_OFFSET_SMALLEST = "smallest"; - private static final String KAFKA_OFFSET_LATEST = "latest"; - private static final String KAFKA_OFFSET_EARLIEST = "earliest"; - private static final String KAFKA_OFFSET_NONE = "none"; - - /* - * By default, KafkaConsumer will fetch ALL available messages for all the partitions. - * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll(). - */ - static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100"; - - private KafkaConsumerConfig(Properties props) { - super(props); - } - - /** - * Create kafka consumer configs, based on the subset of global configs. - * @param config - * @param systemName - * @param clientId - * @param injectProps - * @return KafkaConsumerConfig - */ - public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId, - Map<String, String> injectProps) { - - final Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true); - - final String groupId = getConsumerGroupId(config); - - final Properties consumerProps = new Properties(); - consumerProps.putAll(subConf); - - consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); - consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId); - - //Kafka client configuration - - // put overrides - consumerProps.putAll(injectProps); - - // These are values we enforce in sazma, and they cannot be overwritten. - - // Disable consumer auto-commit because Samza controls commits - consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - - // Translate samza config value to kafka config value - consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - getAutoOffsetResetValue(consumerProps)); - - // make sure bootstrap configs are in ?? SHOULD WE FAIL IF THEY ARE NOT? - if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { - // get it from the producer config - String bootstrapServers = - config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - if (StringUtils.isEmpty(bootstrapServers)) { - throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config for " + systemName); - } - consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - } - - // Always use default partition assignment strategy. Do not allow override. - consumerProps.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); - - // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should - // default to byte[] - if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { - LOG.info("setting default key serialization for the consumer(for {}) to ByteArrayDeserializer", systemName); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - } - if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { - LOG.info("setting default value serialization for the consumer(for {}) to ByteArrayDeserializer", systemName); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - } - - // Override default max poll config if there is no value - consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, - (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS); - - return new KafkaConsumerConfig(consumerProps); - } - - // group id should be unique per job - static String getConsumerGroupId(Config config) { - JobConfig jobConfig = new JobConfig(config); - Option<String> jobIdOption = jobConfig.getJobId(); - Option<String> jobNameOption = jobConfig.getName(); - return (jobNameOption.isDefined() ? jobNameOption.get() : "undefined_job_name") + "-" + (jobIdOption.isDefined() - ? jobIdOption.get() : "undefined_job_id"); - } - - // client id should be unique per job - public static String getConsumerClientId(Config config) { - return getConsumerClientId(CONSUMER_CLIENT_ID_PREFIX, config); - } - public static String getProducerClientId(Config config) { - return getConsumerClientId(PRODUCER_CLIENT_ID_PREFIX, config); - } - public static String getAdminClientId(Config config) { - return getConsumerClientId(ADMIN_CLIENT_ID_PREFIX, config); - } - - private static String getConsumerClientId(String id, Config config) { - if (config.get(JobConfig.JOB_NAME()) == null) { - throw new ConfigException("Missing job name"); - } - String jobName = config.get(JobConfig.JOB_NAME()); - String jobId = (config.get(JobConfig.JOB_ID()) != null) ? config.get(JobConfig.JOB_ID()) : "1"; - - return String.format("%s-%s-%s", id.replaceAll("[^A-Za-z0-9]", "_"), jobName.replaceAll("[^A-Za-z0-9]", "_"), - jobId.replaceAll("[^A-Za-z0-9]", "_")); - } - - /** - * If settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset), - * then need to convert them (see kafka.apache.org/documentation): - * "largest" -> "latest" - * "smallest" -> "earliest" - * - * If no setting specified we return "latest" (same as Kafka). - * @param properties All consumer related {@link Properties} parsed from samza config - * @return String representing the config value for "auto.offset.reset" property - */ - static String getAutoOffsetResetValue(Properties properties) { - String autoOffsetReset = properties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KAFKA_OFFSET_LATEST); - - // accept kafka values directly - if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST) - || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) { - return autoOffsetReset; - } - - String newAutoOffsetReset; - switch (autoOffsetReset) { - case SAMZA_OFFSET_LARGEST: - newAutoOffsetReset = KAFKA_OFFSET_LATEST; - break; - case SAMZA_OFFSET_SMALLEST: - newAutoOffsetReset = KAFKA_OFFSET_EARLIEST; - break; - default: - newAutoOffsetReset = KAFKA_OFFSET_LATEST; - } - LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset); - return newAutoOffsetReset; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..4bbe00f --- /dev/null +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java @@ -0,0 +1,198 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.config; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.samza.SamzaException; +import org.apache.samza.config.JobConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + + +/** + * The configuration class for KafkaConsumer + */ +public class KafkaConsumerConfig extends HashMap<String, Object> { + + public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class); + + private static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer"; + private static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer"; + private static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin"; + + /* + * By default, KafkaConsumer will fetch some big number of available messages for all the partitions. + * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll(). + */ + static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100"; + + private KafkaConsumerConfig(Map<String, Object> map) { + super(map); + } + + /** + * This is a help method to create the configs for use in Kafka consumer. + * The values are based on the "consumer" subset of the configs provided by the app and Samza overrides. + * + * @param config - config provided by the app. + * @param systemName - system name for which the consumer is configured. + * @param clientId - client id to be used in the Kafka consumer. + * @return KafkaConsumerConfig + */ + public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId) { + + Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true); + + //Kafka client configuration + String groupId = getConsumerGroupId(config); + + Map<String, Object> consumerProps = new HashMap<>(); + consumerProps.putAll(subConf); + + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); + + + // These are values we enforce in sazma, and they cannot be overwritten. + + // Disable consumer auto-commit because Samza controls commits + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + // Translate samza config value to kafka config value + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + getAutoOffsetResetValue((String)consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))); + + // make sure bootstrap configs are in, if not - get them from the producer + if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + String bootstrapServers = + config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + if (StringUtils.isEmpty(bootstrapServers)) { + throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config for " + systemName); + } + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + + // Always use default partition assignment strategy. Do not allow override. + consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); + + // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should + // default to byte[] + if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { + LOG.info("setting key serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + } + if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { + LOG.info("setting value serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + } + + // Override default max poll config if there is no value + consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS); + + return new KafkaConsumerConfig(consumerProps); + } + + // group id should be unique per job + static String getConsumerGroupId(Config config) { + JobConfig jobConfig = new JobConfig(config); + Option<String> jobIdOption = jobConfig.getJobId(); + Option<String> jobNameOption = jobConfig.getName(); + return (jobNameOption.isDefined() ? jobNameOption.get() : "undefined_job_name") + "-" + (jobIdOption.isDefined() + ? jobIdOption.get() : "undefined_job_id"); + } + + // client id should be unique per job + public static String getConsumerClientId(Config config) { + return getConsumerClientId(CONSUMER_CLIENT_ID_PREFIX, config); + } + + public static String getProducerClientId(Config config) { + return getConsumerClientId(PRODUCER_CLIENT_ID_PREFIX, config); + } + + public static String getAdminClientId(Config config) { + return getConsumerClientId(ADMIN_CLIENT_ID_PREFIX, config); + } + + static String getConsumerClientId(String id, Config config) { + if (config.get(JobConfig.JOB_NAME()) == null) { + throw new ConfigException("Missing job name"); + } + String jobName = config.get(JobConfig.JOB_NAME()); + String jobId = (config.get(JobConfig.JOB_ID()) != null) ? config.get(JobConfig.JOB_ID()) : "1"; + + return String.format("%s-%s-%s", id.replaceAll( + "\\W", "_"), + jobName.replaceAll("\\W", "_"), + jobId.replaceAll("\\W", "_")); + } + + /** + * If settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset), + * then need to convert them (see kafka.apache.org/documentation): + * "largest" -> "latest" + * "smallest" -> "earliest" + * + * If no setting specified we return "latest" (same as Kafka). + * @param autoOffsetReset value from the app provided config + * @return String representing the config value for "auto.offset.reset" property + */ + static String getAutoOffsetResetValue(final String autoOffsetReset) { + final String SAMZA_OFFSET_LARGEST = "largest"; + final String SAMZA_OFFSET_SMALLEST = "smallest"; + final String KAFKA_OFFSET_LATEST = "latest"; + final String KAFKA_OFFSET_EARLIEST = "earliest"; + final String KAFKA_OFFSET_NONE = "none"; + + if (autoOffsetReset == null) { + return KAFKA_OFFSET_LATEST; // return default + } + + // accept kafka values directly + if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST) + || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) { + return autoOffsetReset; + } + + String newAutoOffsetReset; + switch (autoOffsetReset) { + case SAMZA_OFFSET_LARGEST: + newAutoOffsetReset = KAFKA_OFFSET_LATEST; + break; + case SAMZA_OFFSET_SMALLEST: + newAutoOffsetReset = KAFKA_OFFSET_EARLIEST; + break; + default: + newAutoOffsetReset = KAFKA_OFFSET_LATEST; + } + LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset); + return newAutoOffsetReset; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index b67df0a..d2f7096 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import javax.print.DocFlavor; import kafka.common.KafkaException; import kafka.common.TopicAndPartition; import org.apache.kafka.clients.consumer.Consumer; @@ -47,7 +48,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Separate thread that reads messages from kafka and puts them into the BlockingEnvelopeMap. + * This class contains a separate thread that reads messages from kafka and puts them into the BlockingEnvelopeMap + * through KafkaSystemConsumer.KafkaConsumerMessageSink object. * This class is not thread safe. There will be only one instance of this class per KafkaSystemConsumer object. * We still need some synchronization around kafkaConsumer. See pollConsumer() method for details. */ @@ -74,7 +76,8 @@ import org.slf4j.LoggerFactory; private volatile Throwable failureCause = null; private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1); - /* package private */KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId, + // package private constructor + KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId, KafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics, String metricName) { @@ -93,6 +96,11 @@ import org.slf4j.LoggerFactory; "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName); } + @Override + public String toString() { + return String.format("consumerProxy-%s-%s", systemName, clientId); + } + public void start() { if (!consumerPollThread.isAlive()) { LOG.info("Starting KafkaConsumerProxy polling thread for system " + systemName + " " + this.toString()); @@ -108,12 +116,43 @@ import org.slf4j.LoggerFactory; } } } else { - LOG.debug("Tried to start an already started KafkaConsumerProxy (%s). Ignoring.", this.toString()); + LOG.warn("Tried to start an already started KafkaConsumerProxy (%s). Ignoring.", this.toString()); + } + + if (topicPartitions2SSP.size() == 0) { + String msg = String.format("Cannot start empty set of TopicPartitions for system %s, clientid %s", + systemName, clientId); + LOG.error(msg); + throw new SamzaException(msg); } } - // add new partition to the list of polled partitions - // this method is called only at the beginning, before the thread is started + /** + * Stop the thread and wait for it to stop. + * @param timeoutMs how long to wait in join + */ + public void stop(long timeoutMs) { + LOG.info("Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName()); + + isRunning = false; + try { + consumerPollThread.join(timeoutMs); + // join returns event if the thread didn't finish + // in this case we should interrupt it and wait again + if (consumerPollThread.isAlive()) { + consumerPollThread.interrupt(); + consumerPollThread.join(timeoutMs); + } + } catch (InterruptedException e) { + LOG.warn("Join in KafkaConsumerProxy has failed", e); + consumerPollThread.interrupt(); + } + } + + /** + * Add new partition to the list of polled partitions. + * This method should be called only at the beginning, before the thread is started. + */ public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) { LOG.info(String.format("Adding new topic and partition %s, offset = %s to queue for consumer %s", ssp, nextOffset, this)); @@ -124,67 +163,13 @@ import org.slf4j.LoggerFactory; nextOffsets.put(ssp, nextOffset); - // we reuse existing metrics. They assume host and port for the broker - // for now fake the port with the consumer name kafkaConsumerMetrics.setTopicPartitionValue(metricName, nextOffsets.size()); } - /** - * creates a separate thread for pulling messages - */ - private Runnable createProxyThreadRunnable() { - Runnable runnable= () -> { - isRunning = true; - - try { - consumerPollThreadStartLatch.countDown(); - LOG.info("Starting runnable " + consumerPollThread.getName()); - initializeLags(); - while (isRunning) { - fetchMessages(); - } - } catch (Throwable throwable) { - LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable); - // SamzaKafkaSystemConsumer uses the failureCause to propagate the throwable to the container - failureCause = throwable; - isRunning = false; - } - - if (!isRunning) { - LOG.info("Stopping the KafkaConsumerProxy poll thread for system: {}.", systemName); - } - }; - - return runnable; - } - - private void initializeLags() { - // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag. - Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions2SSP.keySet()); - endOffsets.forEach((tp, offset) -> { - SystemStreamPartition ssp = topicPartitions2SSP.get(tp); - long startingOffset = nextOffsets.get(ssp); - // End offsets are the offset of the newest message + 1 - // If the message we are about to consume is < end offset, we are starting with a lag. - long initialLag = endOffsets.get(tp) - startingOffset; - - LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset); - latestLags.put(ssp, initialLag); - sink.setIsAtHighWatermark(ssp, initialLag == 0); - }); - - // initialize lag metrics - refreshLatencyMetrics(); - } - // the actual polling of the messages from kafka - public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollConsumer( + private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollConsumer( Set<SystemStreamPartition> systemStreamPartitions, long timeout) { - if (topicPartitions2SSP.size() == 0) { - throw new SamzaException("cannot poll empty set of TopicPartitions"); - } - // Since we need to poll only from some subset of TopicPartitions (passed as the argument), // we need to pause the rest. List<TopicPartition> topicPartitionsToPause = new ArrayList<>(); @@ -201,10 +186,9 @@ import org.slf4j.LoggerFactory; } ConsumerRecords<K, V> records; - // make a call on the client + try { - // Currently, when doing checkpoint we are making a safeOffset request through this client, thus we need to synchronize - // them. In the future we may use this client for the actually checkpointing. + // Synchronize, in case the consumer is used in some other thread (metadata or something else) synchronized (kafkaConsumer) { // Since we are not polling from ALL the subscribed topics, so we need to "change" the subscription temporarily kafkaConsumer.pause(topicPartitionsToPause); @@ -213,12 +197,7 @@ import org.slf4j.LoggerFactory; // resume original set of subscription - may be required for checkpointing kafkaConsumer.resume(topicPartitionsToPause); } - } catch (InvalidOffsetException e) { - // If the consumer has thrown this exception it means that auto reset is not set for this consumer. - // So we just rethrow. - LOG.error("Caught InvalidOffsetException in pollConsumer", e); - throw e; - } catch (KafkaException e) { + } catch (Exception e) { // we may get InvalidOffsetException | AuthorizationException | KafkaException exceptions, // but we still just rethrow, and log it up the stack. LOG.error("Caught a Kafka exception in pollConsumer", e); @@ -230,11 +209,10 @@ import org.slf4j.LoggerFactory; private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> processResults(ConsumerRecords<K, V> records) { if (records == null) { - throw new SamzaException("processResults is called with null object for records"); + throw new SamzaException("ERROR:records is null, after pollConsumer call (in processResults)"); } - int capacity = (int) (records.count() / 0.75 + 1); // to avoid rehash, allocate more then 75% of expected capacity. - Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new HashMap<>(capacity); + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new HashMap<>(records.count()); // Parse the returned records and convert them into the IncomingMessageEnvelope. // Note. They have been already de-serialized by the consumer. for (ConsumerRecord<K, V> record : records) { @@ -268,6 +246,52 @@ import org.slf4j.LoggerFactory; return results; } + // creates a separate thread for getting the messages. + private Runnable createProxyThreadRunnable() { + Runnable runnable= () -> { + isRunning = true; + + try { + consumerPollThreadStartLatch.countDown(); + LOG.info("Starting runnable " + consumerPollThread.getName()); + initializeLags(); + while (isRunning) { + fetchMessages(); + } + } catch (Throwable throwable) { + LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable); + // KafkaSystemConsumer uses the failureCause to propagate the throwable to the container + failureCause = throwable; + isRunning = false; + } + + if (!isRunning) { + LOG.info("KafkaConsumerProxy for system {} has stopped.", systemName); + } + }; + + return runnable; + } + + private void initializeLags() { + // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag. + Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions2SSP.keySet()); + endOffsets.forEach((tp, offset) -> { + SystemStreamPartition ssp = topicPartitions2SSP.get(tp); + long startingOffset = nextOffsets.get(ssp); + // End offsets are the offset of the newest message + 1 + // If the message we are about to consume is < end offset, we are starting with a lag. + long initialLag = endOffsets.get(tp) - startingOffset; + + LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset); + latestLags.put(ssp, initialLag); + sink.setIsAtHighWatermark(ssp, initialLag == 0); + }); + + // initialize lag metrics + refreshLatencyMetrics(); + } + private int getRecordSize(ConsumerRecord<K, V> r) { int keySize = (r.key() == null) ? 0 : r.serializedKeySize(); return keySize + r.serializedValueSize(); @@ -291,9 +315,7 @@ import org.slf4j.LoggerFactory; kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark); } - /* - This method put messages into blockingEnvelopeMap. - */ + private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List<IncomingMessageEnvelope> envelopes) { long nextOffset = nextOffsets.get(ssp); @@ -317,11 +339,9 @@ import org.slf4j.LoggerFactory; } } - /* - The only way to figure out lag for the KafkaConsumer is to look at the metrics after each poll() call. - One of the metrics (records-lag) shows how far behind the HighWatermark the consumer is. - This method populates the lag information for each SSP into latestLags member variable. - */ + // The only way to figure out lag for the KafkaConsumer is to look at the metrics after each poll() call. + // One of the metrics (records-lag) shows how far behind the HighWatermark the consumer is. + // This method populates the lag information for each SSP into latestLags member variable. private void populateCurrentLags(Set<SystemStreamPartition> ssps) { Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics(); @@ -339,12 +359,6 @@ import org.slf4j.LoggerFactory; // so the lag is now at least 0, which is the same as Samza's definition. // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling. long currentLag = (currentLagM != null) ? (long) currentLagM.value() : -1L; - /* - Metric averageLagM = consumerMetrics.get(new MetricName(tp + ".records-lag-avg", "consumer-fetch-manager-metrics", "", tags)); - double averageLag = (averageLagM != null) ? averageLagM.value() : -1.0; - Metric maxLagM = consumerMetrics.get(new MetricName(tp + ".records-lag-max", "consumer-fetch-manager-metrics", "", tags)); - double maxLag = (maxLagM != null) ? maxLagM.value() : -1.0; - */ latestLags.put(ssp, currentLag); // calls the setIsAtHead for the BlockingEnvelopeMap @@ -352,10 +366,8 @@ import org.slf4j.LoggerFactory; } } - /* - Get the latest lag for a specific SSP. - */ - public long getLatestLag(SystemStreamPartition ssp) { + // Get the latest lag for a specific SSP. + private long getLatestLag(SystemStreamPartition ssp) { Long lag = latestLags.get(ssp); if (lag == null) { throw new SamzaException("Unknown/unregistered ssp in latestLags request: " + ssp); @@ -363,9 +375,7 @@ import org.slf4j.LoggerFactory; return lag; } - /* - Using the consumer to poll the messages from the stream. - */ + // Using the consumer to poll the messages from the stream. private void fetchMessages() { Set<SystemStreamPartition> sspsToFetch = new HashSet<>(); for (SystemStreamPartition ssp : nextOffsets.keySet()) { @@ -380,7 +390,7 @@ import org.slf4j.LoggerFactory; Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response; LOG.debug("pollConsumer from following SSPs: {}; total#={}", sspsToFetch, sspsToFetch.size()); - response = pollConsumer(sspsToFetch, 500); // TODO should be default value from ConsumerConfig + response = pollConsumer(sspsToFetch, 500L); // move the responses into the queue for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) { @@ -430,27 +440,5 @@ import org.slf4j.LoggerFactory; Throwable getFailureCause() { return failureCause; } - - /** - * stop the thread and wait for it to stop - * @param timeoutMs how long to wait in join - */ - public void stop(long timeoutMs) { - LOG.info("Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName()); - - isRunning = false; - try { - consumerPollThread.join(timeoutMs); - // join returns event if the thread didn't finish - // in this case we should interrupt it and wait again - if (consumerPollThread.isAlive()) { - consumerPollThread.interrupt(); - consumerPollThread.join(timeoutMs); - } - } catch (InterruptedException e) { - LOG.warn("Join in KafkaConsumerProxy has failed", e); - consumerPollThread.interrupt(); - } - } } http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java index 9101a89..e5ded8d 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java @@ -31,9 +31,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import kafka.common.TopicAndPartition; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.KafkaConsumerConfig; +import org.apache.samza.config.KafkaConsumerConfig; import org.apache.kafka.common.TopicPartition; -import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.KafkaConfig; @@ -56,32 +55,33 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy private final Consumer<K, V> kafkaConsumer; private final String systemName; - private final KafkaSystemConsumerMetrics samzaConsumerMetrics; private final String clientId; - private final String metricName; private final AtomicBoolean stopped = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false); private final Config config; private final boolean fetchThresholdBytesEnabled; + private final KafkaSystemConsumerMetrics metrics; // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap. - /* package private */final KafkaConsumerMessageSink messageSink; + final KafkaConsumerMessageSink messageSink; - // proxy is doing the actual reading + // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates + // BlockeingEnvelopMap's buffers. final private KafkaConsumerProxy proxy; - /* package private */final Map<TopicPartition, String> topicPartitions2Offset = new HashMap<>(); - /* package private */final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>(); - /* package private */ long perPartitionFetchThreshold; - /* package private */ long perPartitionFetchThresholdBytes; + // keep registration data until the start - mapping between registered SSPs and topicPartitions, and the offsets + final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>(); + final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>(); + + long perPartitionFetchThreshold; + long perPartitionFetchThresholdBytes; /** - * Constructor * @param systemName system name for which we create the consumer - * @param config config - * @param metrics metrics - * @param clock - system clock + * @param config config passed into the the app + * @param metrics metrics collecting object + * @param clock - system clock, allows to override internal clock (System.currentTimeMillis()) */ public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) { @@ -89,54 +89,50 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy super(metrics.registry(), clock, metrics.getClass().getName()); this.kafkaConsumer = kafkaConsumer; - this.samzaConsumerMetrics = metrics; this.clientId = clientId; this.systemName = systemName; this.config = config; - this.metricName = String.format("%s %s", systemName, clientId); + this.metrics = metrics; - this.fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName); + fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName); // create a sink for passing the messages between the proxy and the consumer messageSink = new KafkaConsumerMessageSink(); - // Create the proxy to do the actual message reading. It is a separate thread that reads the messages from the stream - // and puts them into the sink. - proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, samzaConsumerMetrics, metricName); - LOG.info("Created consumer proxy: " + proxy); + // Create the proxy to do the actual message reading. + String metricName = String.format("%s %s", systemName, clientId); + proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, metrics, metricName); + LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy ); - LOG.info("Created SamzaKafkaSystemConsumer for system={}, clientId={}, metricName={}, KafkaConsumer={}", systemName, - clientId, metricName, this.kafkaConsumer.toString()); + LOG.info("{}: Created KafkaSystemConsumer {}", this, kafkaConsumer); } /** - * create kafka consumer + * Create internal kafka consumer object, which will be used in the Proxy. * @param systemName system name for which we create the consumer * @param clientId client id to use int the kafka client * @param config config - * @return kafka consumer + * @return kafka consumer object */ public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String systemName, String clientId, Config config) { - Map<String, String> injectProps = new HashMap<>(); - // extract kafka client configs KafkaConsumerConfig consumerConfig = - KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId, injectProps); + KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId); - LOG.info("KafkaClient properties for systemName {}: {}", systemName, consumerConfig.originals()); + LOG.info("{}:{} KafkaClient properties {}", systemName, clientId, consumerConfig); - return new KafkaConsumer<>(consumerConfig.originals()); + return new KafkaConsumer(consumerConfig); } @Override public void start() { if (!started.compareAndSet(false, true)) { - LOG.warn("attempting to start the consumer for the second (or more) time."); + LOG.warn("{}: Attempting to start the consumer for the second (or more) time.", this); return; } if (stopped.get()) { - LOG.warn("attempting to start a stopped consumer"); + LOG.warn("{}: Attempting to start a stopped consumer", this); return; } // initialize the subscriptions for all the registered TopicPartitions @@ -145,58 +141,59 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy setFetchThresholds(); startConsumer(); - LOG.info("consumer {} started", this); + LOG.info("{}: Consumer started", this); } private void startSubscription() { //subscribe to all the registered TopicPartitions - LOG.info("consumer {}, subscribes to {} ", this, topicPartitions2SSP.keySet()); + LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToSSP.keySet()); try { synchronized (kafkaConsumer) { // we are using assign (and not subscribe), so we need to specify both topic and partition - kafkaConsumer.assign(topicPartitions2SSP.keySet()); + kafkaConsumer.assign(topicPartitionsToSSP.keySet()); } } catch (Exception e) { - LOG.warn("startSubscription failed.", e); + LOG.warn("{}: Start subscription failed", this); throw new SamzaException(e); } } - /* - Set the offsets to start from. - Add the TopicPartitions to the proxy. - Start the proxy thread. + /** + * Set the offsets to start from. + * Register the TopicPartitions with the proxy. + * Start the proxy. */ void startConsumer() { - //set the offset for each TopicPartition - if (topicPartitions2Offset.size() <= 0) { - LOG.warn("Consumer {} is not subscribed to any SSPs", this); + // set the offset for each TopicPartition + if (topicPartitionsToOffset.size() <= 0) { + LOG.warn("{}: Consumer is not subscribed to any SSPs", this); } - topicPartitions2Offset.forEach((tp, startingOffsetString) -> { + topicPartitionsToOffset.forEach((tp, startingOffsetString) -> { long startingOffset = Long.valueOf(startingOffsetString); try { synchronized (kafkaConsumer) { - // TODO in the future we may need to add special handling here for BEGIN/END_OFFSET - // this will call KafkaConsumer.seekToBegin/End() kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value } } catch (Exception e) { - // all other exceptions - non recoverable - LOG.error("Got Exception while seeking to " + startingOffsetString + " for " + tp, e); - throw new SamzaException(e); + // all recoverable execptions are handled by the client. + // if we get here there is nothing left to do but bail out. + String msg = String.format("%s: Got Exception while seeking to %s for partition %s", + this, startingOffsetString, tp); + LOG.error(msg, e); + throw new SamzaException(msg, e); } - LOG.info("Changing consumer's starting offset for tp = " + tp + " to " + startingOffsetString); + LOG.info("{}: Changing consumer's starting offset for tp = %s to %s", this, tp, startingOffsetString); // add the partition to the proxy - proxy.addTopicPartition(topicPartitions2SSP.get(tp), startingOffset); + proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset); }); // start the proxy thread if (proxy != null && !proxy.isRunning()) { - LOG.info("Starting proxy: " + proxy); + LOG.info("{}: Starting proxy {}", this, proxy); proxy.start(); } } @@ -209,57 +206,59 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy long fetchThreshold = FETCH_THRESHOLD; if (fetchThresholdOption.isDefined()) { fetchThreshold = Long.valueOf(fetchThresholdOption.get()); - LOG.info("fetchThresholdOption is configured. fetchThreshold=" + fetchThreshold); + LOG.info("{}: fetchThresholdOption is configured. fetchThreshold={}", this, fetchThreshold); } Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName); long fetchThresholdBytes = FETCH_THRESHOLD_BYTES; if (fetchThresholdBytesOption.isDefined()) { fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get()); - LOG.info("fetchThresholdBytesOption is configured. fetchThresholdBytes=" + fetchThresholdBytes); + LOG.info("{}: fetchThresholdBytesOption is configured. fetchThresholdBytes={}", this, fetchThresholdBytes); } - int numTPs = topicPartitions2SSP.size(); - assert (numTPs == topicPartitions2Offset.size()); + int numTPs = topicPartitionsToSSP.size(); + if (numTPs == topicPartitionsToOffset.size()) { + throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()"); + } - LOG.info("fetchThresholdBytes = " + fetchThresholdBytes + "; fetchThreshold=" + fetchThreshold); - LOG.info("number of topicPartitions " + numTPs); + LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; partitions num={}", + this, fetchThresholdBytes, fetchThreshold, numTPs); if (numTPs > 0) { perPartitionFetchThreshold = fetchThreshold / numTPs; - LOG.info("perPartitionFetchThreshold=" + perPartitionFetchThreshold); + LOG.info("{}: perPartitionFetchThreshold={}", this, perPartitionFetchThreshold); if (fetchThresholdBytesEnabled) { // currently this feature cannot be enabled, because we do not have the size of the messages available. // messages get double buffered, hence divide by 2 perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numTPs; - LOG.info("perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes=" - + perPartitionFetchThresholdBytes); + LOG.info("{} :perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes={}", + this, perPartitionFetchThresholdBytes); } } } @Override public void stop() { - LOG.info("Stopping Samza kafkaConsumer " + this); - if (!stopped.compareAndSet(false, true)) { - LOG.warn("attempting to stop stopped consumer."); + LOG.warn("{}: Attempting to stop stopped consumer.", this); return; } - // stop the proxy (with 5 minutes timeout) + LOG.info("{}: Stopping Samza kafkaConsumer ", this); + + // stop the proxy (with 1 minute timeout) if (proxy != null) { - LOG.info("Stopping proxy " + proxy); + LOG.info("{}: Stopping proxy {}", this, proxy); proxy.stop(TimeUnit.SECONDS.toMillis(60)); } try { synchronized (kafkaConsumer) { - LOG.info("Closing kafka consumer " + kafkaConsumer); + LOG.info("{}: Closing kafkaSystemConsumer {}", this, kafkaConsumer); kafkaConsumer.close(); } } catch (Exception e) { - LOG.warn("failed to stop SamzaRawKafkaConsumer + " + this, e); + LOG.warn("{}: Failed to stop KafkaSystemConsumer.", this, e); } } @@ -270,45 +269,45 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy public void register(SystemStreamPartition systemStreamPartition, String offset) { if (started.get()) { String msg = - String.format("Trying to register partition after consumer has been started. sn=%s, ssp=%s", systemName, - systemStreamPartition); - LOG.error(msg); + String.format("%s: Trying to register partition after consumer has been started. ssp=%s", + this, systemStreamPartition); throw new SamzaException(msg); } if (!systemStreamPartition.getSystem().equals(systemName)) { - LOG.warn("ignoring SSP " + systemStreamPartition + ", because this consumer's system is " + systemName); + LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition); return; } + LOG.info("{}: Registering ssp = {} with offset {}", this, systemStreamPartition, offset); + super.register(systemStreamPartition, offset); TopicPartition tp = toTopicPartition(systemStreamPartition); - topicPartitions2SSP.put(tp, systemStreamPartition); + topicPartitionsToSSP.put(tp, systemStreamPartition); - LOG.info("Registering ssp = " + systemStreamPartition + " with offset " + offset); - String existingOffset = topicPartitions2Offset.get(tp); + String existingOffset = topicPartitionsToOffset.get(tp); // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages. if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) { - topicPartitions2Offset.put(tp, offset); + topicPartitionsToOffset.put(tp, offset); } - samzaConsumerMetrics.registerTopicAndPartition(toTopicAndPartition(tp)); + metrics.registerTopicAndPartition(toTopicAndPartition(tp)); } /** * Compare two String offsets. - * Note. There is a method in KafkaAdmin that does that, but that would require instantiation of systemadmin for each consumer. + * Note. There is a method in KafkaSystemAdmin that does that, but that would require instantiation of systemadmin for each consumer. * @return see {@link Long#compareTo(Long)} */ - public static int compareOffsets(String offset1, String offset2) { + private static int compareOffsets(String offset1, String offset2) { return Long.valueOf(offset1).compareTo(Long.valueOf(offset2)); } @Override public String toString() { - return systemName + "/" + clientId + "/" + super.toString(); + return String.format("%s:%s", systemName, clientId); } @Override @@ -318,17 +317,11 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy // check if the proxy is running if (!proxy.isRunning()) { stop(); - if (proxy.getFailureCause() != null) { - String message = "KafkaConsumerProxy has stopped"; - throw new SamzaException(message, proxy.getFailureCause()); - } else { - LOG.warn("Failure cause is not populated for KafkaConsumerProxy"); - throw new SamzaException("KafkaConsumerProxy has stopped"); - } + String message = String.format("%s: KafkaConsumerProxy has stopped.", this); + throw new SamzaException(message, proxy.getFailureCause()); } - Map<SystemStreamPartition, List<IncomingMessageEnvelope>> res = super.poll(systemStreamPartitions, timeout); - return res; + return super.poll(systemStreamPartitions, timeout); } /** @@ -353,9 +346,6 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy return systemName; } - //////////////////////////////////// - // inner class for the message sink - //////////////////////////////////// public class KafkaConsumerMessageSink { public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) { @@ -363,8 +353,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy } boolean needsMoreMessages(SystemStreamPartition ssp) { - LOG.debug("needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};" - + "(limit={}); messagesNumInQueue={}(limit={};", ssp, fetchThresholdBytesEnabled, + LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};" + + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, fetchThresholdBytesEnabled, getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp), perPartitionFetchThreshold); @@ -376,16 +366,15 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy } void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) { - LOG.trace("Incoming message ssp = {}: envelope = {}.", ssp, envelope); + LOG.trace("{}: Incoming message ssp = {}: envelope = {}.", this, ssp, envelope); try { put(ssp, envelope); } catch (InterruptedException e) { throw new SamzaException( - String.format("Interrupted while trying to add message with offset %s for ssp %s", envelope.getOffset(), - ssp)); + String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s", + this, envelope.getOffset(), ssp)); } } - } // end of KafkaMessageSink class - /////////////////////////////////////////////////////////////////////////// + } } http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala index 7dce261..c4552e6 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala @@ -50,7 +50,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr clientBytesRead.put(clientName, newCounter("%s-bytes-read" format clientName)) clientReads.put((clientName), newCounter("%s-messages-read" format clientName)) clientSkippedFetchRequests.put((clientName), newCounter("%s-skipped-fetch-requests" format clientName)) - topicPartitions.put(clientName, newGauge("%s-topic-partitions" format clientName, 0)) + topicPartitions.put(clientName, newGauge("%s-registered-topic-partitions" format clientName, 0)) } // java friendlier interfaces http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index 5342b08..deaee56 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -22,7 +22,7 @@ package org.apache.samza.system.kafka import java.util.Properties import kafka.utils.ZkUtils -import org.apache.kafka.clients.consumer.{KafkaConsumer, KafkaConsumerConfig} +import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.KafkaProducer import org.apache.samza.SamzaException import org.apache.samza.config.ApplicationConfig.ApplicationMode @@ -30,7 +30,7 @@ import org.apache.samza.config.KafkaConfig.Config2Kafka import org.apache.samza.config.StorageConfig._ import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task -import org.apache.samza.config.{ApplicationConfig, Config, KafkaConfig, StreamConfig} +import org.apache.samza.config._ import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, SystemProducer} import org.apache.samza.util._ http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java deleted file mode 100644 index 264098b..0000000 --- a/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.config.MapConfig; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - - -public class TestKafkaConsumerConfig { - private final Map<String, String> props = new HashMap<>(); - public final static String SYSTEM_NAME = "testSystem"; - public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer."; - public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer."; - private final static String CLIENT_ID = "clientId"; - - @Before - public void setProps() { - - } - - @Test - public void testDefaultsAndOverrides() { - - Map<String, String> overrides = new HashMap<>(); - overrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored - overrides.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "Ignore"); // should be ignored - overrides.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // should NOT be ignored - - // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored - props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092"); - props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092"); - - // should be overridden - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "true"); //ignore - props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); // ignore - - - // should be overridden - props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "200"); - - Config config = new MapConfig(props); - KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig( - config, SYSTEM_NAME, CLIENT_ID, overrides); - - Assert.assertEquals(kafkaConsumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), false); - - Assert.assertEquals( - kafkaConsumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), - Integer.valueOf(KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS)); - - Assert.assertEquals( - kafkaConsumerConfig.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG).get(0), - RangeAssignor.class.getName()); - - Assert.assertEquals( - kafkaConsumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0), - "useThis:9092"); - Assert.assertEquals( - kafkaConsumerConfig.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG).longValue(), - 100); - - Assert.assertEquals( - kafkaConsumerConfig.getClass(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), - ByteArrayDeserializer.class); - - Assert.assertEquals( - kafkaConsumerConfig.getClass(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), - ByteArrayDeserializer.class); - - Assert.assertEquals( - kafkaConsumerConfig.getString(ConsumerConfig.CLIENT_ID_CONFIG), - CLIENT_ID); - - Assert.assertEquals( - kafkaConsumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG), - KafkaConsumerConfig.getConsumerGroupId(config)); - } - - @Test - // test stuff that should not be overridden - public void testNotOverride() { - - // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used - props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092"); - props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName()); - props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName()); - - - Config config = new MapConfig(props); - KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig( - config, SYSTEM_NAME, CLIENT_ID, Collections.emptyMap()); - - Assert.assertEquals( - kafkaConsumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0), - "useThis:9092"); - - Assert.assertEquals( - kafkaConsumerConfig.getClass(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), - TestKafkaConsumerConfig.class); - - Assert.assertEquals( - kafkaConsumerConfig.getClass(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), - TestKafkaConsumerConfig.class); - } - - - - @Test(expected = SamzaException.class) - public void testNoBootstrapServers() { - KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig( - new MapConfig(Collections.emptyMap()), SYSTEM_NAME, "clientId", Collections.emptyMap()); - - Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java new file mode 100644 index 0000000..719ea22 --- /dev/null +++ b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samza.config; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.samza.SamzaException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public class TestKafkaConsumerConfig { + private final Map<String, String> props = new HashMap<>(); + public final static String SYSTEM_NAME = "testSystem"; + public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer."; + public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer."; + private final static String CLIENT_ID = "clientId"; + + @Before + public void setProps() { + + } + + @Test + public void testDefaults() { + + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "Ignore"); // should be ignored + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // should NOT be ignored + + // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored + props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092"); + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092"); + + Config config = new MapConfig(props); + KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig( + config, SYSTEM_NAME, CLIENT_ID); + + Assert.assertEquals("false", kafkaConsumerConfig.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + + Assert.assertEquals( + KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS, + kafkaConsumerConfig.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); + + Assert.assertEquals( + RangeAssignor.class.getName(), + kafkaConsumerConfig.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)); + + Assert.assertEquals( + "useThis:9092", + kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + Assert.assertEquals( + "100", + kafkaConsumerConfig.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + + Assert.assertEquals( + ByteArrayDeserializer.class.getName(), + kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); + + Assert.assertEquals( + ByteArrayDeserializer.class.getName(), + kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) ); + + Assert.assertEquals( + CLIENT_ID, + kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG)); + + Assert.assertEquals( + KafkaConsumerConfig.getConsumerGroupId(config), + kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG)); + } + + @Test + // test stuff that should not be overridden + public void testNotOverride() { + + // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used + props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092"); + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName()); + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName()); + + + Config config = new MapConfig(props); + KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig( + config, SYSTEM_NAME, CLIENT_ID); + + Assert.assertEquals( + "useThis:9092", + kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + + Assert.assertEquals( + TestKafkaConsumerConfig.class.getName(), + kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); + + Assert.assertEquals( + TestKafkaConsumerConfig.class.getName(), + kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); + } + + @Test + public void testGetConsumerClientId() { + Map<String, String> map = new HashMap<>(); + + map.put(JobConfig.JOB_NAME(), "jobName"); + map.put(JobConfig.JOB_ID(), "jobId"); + String result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map)); + Assert.assertEquals("consumer-jobName-jobId", result); + + result = KafkaConsumerConfig.getConsumerClientId("consumer-", new MapConfig(map)); + Assert.assertEquals("consumer_-jobName-jobId", result); + + result = KafkaConsumerConfig.getConsumerClientId("super-duper-consumer", new MapConfig(map)); + Assert.assertEquals("super_duper_consumer-jobName-jobId", result); + + map.put(JobConfig.JOB_NAME(), " very important!job"); + result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map)); + Assert.assertEquals("consumer-_very_important_job-jobId", result); + + map.put(JobConfig.JOB_ID(), "number-#3"); + result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map)); + Assert.assertEquals("consumer-_very_important_job-number__3", result); + } + + + + @Test(expected = SamzaException.class) + public void testNoBootstrapServers() { + KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig( + new MapConfig(Collections.emptyMap()), SYSTEM_NAME, "clientId"); + + Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java index d90bc35..9e8ff44 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java @@ -27,7 +27,7 @@ import java.util.Map; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.KafkaConsumerConfig; +import org.apache.samza.config.KafkaConsumerConfig; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.samza.Partition; import org.apache.samza.config.Config; @@ -67,8 +67,8 @@ public class TestKafkaSystemConsumer { Config config = new MapConfig(map); KafkaConsumerConfig consumerConfig = - KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID, Collections.emptyMap()); - final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig.originals()); + KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID); + final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig); MockKafkaSystmeCosumer newKafkaSystemConsumer = new MockKafkaSystmeCosumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID, @@ -116,9 +116,9 @@ public class TestKafkaSystemConsumer { consumer.register(ssp1, "3"); consumer.register(ssp2, "0"); - assertEquals("0", consumer.topicPartitions2Offset.get(KafkaSystemConsumer.toTopicPartition(ssp0))); - assertEquals("2", consumer.topicPartitions2Offset.get(KafkaSystemConsumer.toTopicPartition(ssp1))); - assertEquals("0", consumer.topicPartitions2Offset.get(KafkaSystemConsumer.toTopicPartition(ssp2))); + assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0))); + assertEquals("2", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1))); + assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2))); } @Test
