NewSystemConsumer for kafka system Remove SimpleConsumer and BrokerProxy from Samza's KafkaSystemConsumer implementation. Instead use KafkaConsumerProxy with high-level kafka consumer.
Author: Boris S <[email protected]> Author: Boris Shkolnik <[email protected]> Reviewers: Shanthoosh Venktataraman <[email protected]>, Prateek Maheshwari <[email protected]> Closes #624 from sborya/NewConsumer2 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/003ad106 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/003ad106 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/003ad106 Branch: refs/heads/master Commit: 003ad1068ac7d36f462b49ded0dbb2e4dedc1089 Parents: 078afb5 Author: Boris S <[email protected]> Authored: Tue Sep 25 18:07:44 2018 -0700 Committer: Boris S <[email protected]> Committed: Tue Sep 25 18:07:44 2018 -0700 ---------------------------------------------------------------------- .../samza/system/IncomingMessageEnvelope.java | 3 +- .../ClusterBasedJobCoordinator.java | 2 +- .../apache/samza/storage/StorageRecovery.java | 2 +- .../samza/checkpoint/CheckpointTool.scala | 2 +- .../apache/samza/container/SamzaContainer.scala | 2 +- .../samza/coordinator/JobModelManager.scala | 6 +- .../samza/job/local/ProcessJobFactory.scala | 3 +- .../samza/job/local/ThreadJobFactory.scala | 20 +- .../samza/coordinator/TestJobCoordinator.scala | 4 +- .../org/apache/samza/config/KafkaConfig.scala | 5 +- .../samza/config/KafkaConsumerConfig.java | 210 +++++++++ .../apache/samza/system/kafka/BrokerProxy.scala | 332 -------------- .../samza/system/kafka/KafkaConsumerProxy.java | 423 ++++++++++++++++++ .../samza/system/kafka/KafkaSystemConsumer.java | 371 ++++++++++++++++ .../system/kafka/KafkaSystemConsumer.scala | 309 ------------- .../kafka/KafkaSystemConsumerMetrics.scala | 68 ++- .../samza/system/kafka/KafkaSystemFactory.scala | 80 ++-- .../samza/config/TestKafkaConsumerConfig.java | 150 +++++++ .../system/kafka/TestKafkaSystemAdminJava.java | 18 +- .../samza/system/kafka/TestBrokerProxy.scala | 434 ------------------- .../system/kafka/TestKafkaSystemConsumer.java | 220 ++++++++++ .../system/kafka/TestKafkaSystemConsumer.scala | 191 -------- .../test/integration/StreamTaskTestUtil.scala | 17 +- .../integration/TestShutdownStatefulTask.scala | 4 +- .../samza/validation/YarnJobValidationTool.java | 2 +- .../yarn/TestSamzaYarnAppMasterService.scala | 4 +- 26 files changed, 1491 insertions(+), 1391 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java index 4d0ce2f..c5aed31 100644 --- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java +++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java @@ -59,7 +59,8 @@ public class IncomingMessageEnvelope { * @param message A deserialized message received from the partition offset. * @param size size of the message and key in bytes. */ - public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message, int size) { + public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, + Object key, Object message, int size) { this.systemStreamPartition = systemStreamPartition; this.offset = offset; this.key = key; http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index 016d171..12e26f7 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -174,7 +174,7 @@ public class ClusterBasedJobCoordinator { // build a JobModelManager and ChangelogStreamManager and perform partition assignments. changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager); - jobModelManager = JobModelManager.apply(coordinatorStreamManager, changelogStreamManager.readPartitionMapping()); + jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping()); config = jobModelManager.jobModel().getConfig(); hasDurableStores = new StorageConfig(config).hasDurableStores(); http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index bf46018..9a76d75 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -131,7 +131,7 @@ public class StorageRecovery extends CommandLine { coordinatorStreamManager.start(); coordinatorStreamManager.bootstrap(); ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager); - JobModel jobModel = JobModelManager.apply(coordinatorStreamManager, changelogStreamManager.readPartitionMapping()).jobModel(); + JobModel jobModel = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping()).jobModel(); containers = jobModel.getContainers(); coordinatorStreamManager.stop(); } http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala index 0ca8a3d..65fb419 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala @@ -170,7 +170,7 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap, manage coordinatorStreamManager.start coordinatorStreamManager.bootstrap val changelogManager = new ChangelogStreamManager(coordinatorStreamManager) - val jobModelManager = JobModelManager(coordinatorStreamManager, changelogManager.readPartitionMapping()) + val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, changelogManager.readPartitionMapping()) val taskNames = jobModelManager .jobModel .getContainers http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 7b64f5e..fba7329 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -822,7 +822,7 @@ class SamzaContainer( } try { - info("Shutting down.") + info("Shutting down SamzaContainer.") removeShutdownHook jmxServer.stop http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index f7698c0..600b7a1 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -35,7 +35,6 @@ import org.apache.samza.container.LocalityManager import org.apache.samza.container.TaskName import org.apache.samza.coordinator.server.HttpServer import org.apache.samza.coordinator.server.JobServlet -import org.apache.samza.coordinator.stream.CoordinatorStreamManager import org.apache.samza.job.model.JobModel import org.apache.samza.job.model.TaskModel import org.apache.samza.metrics.MetricsRegistryMap @@ -64,12 +63,11 @@ object JobModelManager extends Logging { * a) Reads the jobModel from coordinator stream using the job's configuration. * b) Recomputes changelog partition mapping based on jobModel and job's configuration. * c) Builds JobModelManager using the jobModel read from coordinator stream. - * @param coordinatorStreamManager Coordinator stream manager. + * @param config Config from the coordinator stream. * @param changelogPartitionMapping The changelog partition-to-task mapping. * @return JobModelManager */ - def apply(coordinatorStreamManager: CoordinatorStreamManager, changelogPartitionMapping: util.Map[TaskName, Integer]) = { - val config = coordinatorStreamManager.getConfig + def apply(config: Config, changelogPartitionMapping: util.Map[TaskName, Integer]) = { val localityManager = new LocalityManager(config, new MetricsRegistryMap()) // Map the name of each system to the corresponding SystemAdmin http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala index 642a484..64f516b 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala @@ -50,7 +50,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging { coordinatorStreamManager.bootstrap val changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager) - val coordinator = JobModelManager(coordinatorStreamManager, changelogStreamManager.readPartitionMapping()) + val coordinator = JobModelManager(coordinatorStreamManager.getConfig, changelogStreamManager.readPartitionMapping()) val jobModel = coordinator.jobModel val taskPartitionMappings: util.Map[TaskName, Integer] = new util.HashMap[TaskName, Integer] @@ -61,6 +61,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging { } changelogStreamManager.writePartitionMapping(taskPartitionMappings) + coordinatorStreamManager.stop() //create necessary checkpoint and changelog streams val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry) http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index abd7f65..bec4ec0 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -20,9 +20,9 @@ package org.apache.samza.job.local import org.apache.samza.application.{ApplicationDescriptorUtil, ApplicationUtil} -import org.apache.samza.config.{Config, TaskConfigJava} import org.apache.samza.config.JobConfig._ import org.apache.samza.config.ShellCommandConfig._ +import org.apache.samza.config.{Config, TaskConfigJava} import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, TaskName} import org.apache.samza.coordinator.JobModelManager import org.apache.samza.coordinator.stream.CoordinatorStreamManager @@ -30,16 +30,15 @@ import org.apache.samza.job.{StreamJob, StreamJobFactory} import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap, MetricsReporter} import org.apache.samza.runtime.ProcessorContext import org.apache.samza.storage.ChangelogStreamManager -import org.apache.samza.task.TaskFactory -import org.apache.samza.task.TaskFactoryUtil +import org.apache.samza.task.{TaskFactory, TaskFactoryUtil} import org.apache.samza.util.Logging import scala.collection.JavaConversions._ import scala.collection.mutable /** - * Creates a new Thread job with the given config - */ + * Creates a new Thread job with the given config + */ class ThreadJobFactory extends StreamJobFactory with Logging { def getJob(config: Config): StreamJob = { info("Creating a ThreadJob, which is only meant for debugging.") @@ -51,7 +50,8 @@ class ThreadJobFactory extends StreamJobFactory with Logging { coordinatorStreamManager.bootstrap val changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager) - val coordinator = JobModelManager(coordinatorStreamManager, changelogStreamManager.readPartitionMapping()) + val coordinator = JobModelManager(coordinatorStreamManager.getConfig, changelogStreamManager.readPartitionMapping()) + val jobModel = coordinator.jobModel val taskPartitionMappings: mutable.Map[TaskName, Integer] = mutable.Map[TaskName, Integer]() @@ -67,6 +67,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging { val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry) if (checkpointManager != null) { checkpointManager.createResources() + checkpointManager.stop() } ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, jobModel.maxChangeLogStreamPartitions) @@ -74,17 +75,17 @@ class ThreadJobFactory extends StreamJobFactory with Logging { val jmxServer = new JmxServer val appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config) - val taskFactory : TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc) + val taskFactory: TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc) // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job. config.getTaskOpts match { case Some(taskOpts) => warn("%s was specified in config, but is not being used because job is being executed with ThreadJob. " + - "You probably want to run %s=%s." format (TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS, classOf[ProcessJobFactory].getName)) + "You probably want to run %s=%s." format(TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS, classOf[ProcessJobFactory].getName)) case _ => None } val containerListener = { - val processorLifecycleListener = appDesc.getProcessorLifecycleListenerFactory().createInstance(new ProcessorContext() { }, config) + val processorLifecycleListener = appDesc.getProcessorLifecycleListenerFactory().createInstance(new ProcessorContext() {}, config) new SamzaContainerListener { override def afterFailure(t: Throwable): Unit = { processorLifecycleListener.afterFailure(t) @@ -120,6 +121,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging { threadJob } finally { coordinator.stop + coordinatorStreamManager.stop() jmxServer.stop } } http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala index 2488355..b7a9bec 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala @@ -275,7 +275,9 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester { coordinatorStreamManager.start coordinatorStreamManager.bootstrap val changelogPartitionManager = new ChangelogStreamManager(coordinatorStreamManager) - JobModelManager(coordinatorStreamManager, changelogPartitionManager.readPartitionMapping()) + val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, changelogPartitionManager.readPartitionMapping()) + coordinatorStreamManager.stop() + jobModelManager } @Before http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 26664ea..ef43e72 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -289,7 +289,10 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { properties } - // kafka config + /** + * @deprecated Use KafkaConsumerConfig + */ + @Deprecated def getKafkaSystemConsumerConfig( systemName: String, clientId: String, groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString, http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..3fa66e5 --- /dev/null +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java @@ -0,0 +1,210 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.config; + +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.samza.SamzaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.runtime.AbstractFunction0; + + +/** + * The configuration class for KafkaConsumer + */ +public class KafkaConsumerConfig extends HashMap<String, Object> { + + public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class); + + static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer"; + static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer"; + static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin"; + + /* + * By default, KafkaConsumer will fetch some big number of available messages for all the partitions. + * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll(). + */ + static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100"; + + private KafkaConsumerConfig(Map<String, Object> map) { + super(map); + } + + /** + * Helper method to create configs for use in Kafka consumer. + * The values are based on the "consumer" subset of the configs provided by the app and Samza overrides. + * + * @param config config provided by the app. + * @param systemName system name to get the consumer configuration for. + * @param clientId client id to be used in the Kafka consumer. + * @return KafkaConsumerConfig + */ + public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId) { + + Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true); + + //Kafka client configuration + String groupId = getConsumerGroupId(config); + + Map<String, Object> consumerProps = new HashMap<>(); + consumerProps.putAll(subConf); + + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); + + // These are values we enforce in sazma, and they cannot be overwritten. + + // Disable consumer auto-commit because Samza controls commits + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + // Translate samza config value to kafka config value + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))); + + // if consumer bootstrap servers are not configured, get them from the producer configs + if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + String bootstrapServers = + config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + if (StringUtils.isEmpty(bootstrapServers)) { + throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config for " + systemName); + } + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + + // Always use default partition assignment strategy. Do not allow override. + consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); + + // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should + // default to byte[] + if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { + LOG.info("setting key serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + } + if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { + LOG.info("setting value serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + } + + // Override default max poll config if there is no value + consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS); + + return new KafkaConsumerConfig(consumerProps); + } + + // group id should be unique per job + static String getConsumerGroupId(Config config) { + JobConfig jobConfig = new JobConfig(config); + Option jobNameOption = jobConfig.getName(); + if (jobNameOption.isEmpty()) { + throw new ConfigException("Missing job name"); + } + String jobName = (String) jobNameOption.get(); + + Option jobIdOption = jobConfig.getJobId(); + String jobId = "1"; + if (! jobIdOption.isEmpty()) { + jobId = (String) jobIdOption.get(); + } + + return String.format("%s-%s", jobName, jobId); + } + + // client id should be unique per job + public static String getConsumerClientId(Config config) { + return getConsumerClientId(CONSUMER_CLIENT_ID_PREFIX, config); + } + + public static String getProducerClientId(Config config) { + return getConsumerClientId(PRODUCER_CLIENT_ID_PREFIX, config); + } + + public static String getAdminClientId(Config config) { + return getConsumerClientId(ADMIN_CLIENT_ID_PREFIX, config); + } + + static String getConsumerClientId(String id, Config config) { + JobConfig jobConfig = new JobConfig(config); + Option jobNameOption = jobConfig.getName(); + if (jobNameOption.isEmpty()) { + throw new ConfigException("Missing job name"); + } + String jobName = (String) jobNameOption.get(); + + Option jobIdOption = jobConfig.getJobId(); + String jobId = "1"; + if (! jobIdOption.isEmpty()) { + jobId = (String) jobIdOption.get(); + } + + return String.format("%s-%s-%s", id.replaceAll("\\W", "_"), jobName.replaceAll("\\W", "_"), + jobId.replaceAll("\\W", "_")); + } + + /** + * If settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset), + * then need to convert them (see kafka.apache.org/documentation): + * "largest" -> "latest" + * "smallest" -> "earliest" + * + * If no setting specified we return "latest" (same as Kafka). + * @param autoOffsetReset value from the app provided config + * @return String representing the config value for "auto.offset.reset" property + */ + static String getAutoOffsetResetValue(final String autoOffsetReset) { + final String SAMZA_OFFSET_LARGEST = "largest"; + final String SAMZA_OFFSET_SMALLEST = "smallest"; + final String KAFKA_OFFSET_LATEST = "latest"; + final String KAFKA_OFFSET_EARLIEST = "earliest"; + final String KAFKA_OFFSET_NONE = "none"; + + if (autoOffsetReset == null) { + return KAFKA_OFFSET_LATEST; // return default + } + + // accept kafka values directly + if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST) + || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) { + return autoOffsetReset; + } + + String newAutoOffsetReset; + switch (autoOffsetReset) { + case SAMZA_OFFSET_LARGEST: + newAutoOffsetReset = KAFKA_OFFSET_LATEST; + break; + case SAMZA_OFFSET_SMALLEST: + newAutoOffsetReset = KAFKA_OFFSET_EARLIEST; + break; + default: + newAutoOffsetReset = KAFKA_OFFSET_LATEST; + } + LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset); + return newAutoOffsetReset; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala deleted file mode 100644 index 423b68a..0000000 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ /dev/null @@ -1,332 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.samza.system.kafka - -import java.lang.Thread.UncaughtExceptionHandler -import java.nio.channels.ClosedByInterruptException -import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} - -import kafka.api._ -import kafka.common.{ErrorMapping, NotLeaderForPartitionException, TopicAndPartition, UnknownTopicOrPartitionException} -import kafka.consumer.ConsumerConfig -import kafka.message.MessageSet -import org.apache.samza.SamzaException -import org.apache.samza.util.ExponentialSleepStrategy -import org.apache.samza.util.KafkaUtil -import org.apache.samza.util.Logging - -import scala.collection.JavaConverters._ -import scala.collection.concurrent - -/** - * A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing - * a way for consumers to retrieve those messages by topic and partition. - */ -class BrokerProxy( - val host: String, - val port: Int, - val system: String, - val clientID: String, - val metrics: KafkaSystemConsumerMetrics, - val messageSink: MessageSink, - val timeout: Int = ConsumerConfig.SocketTimeout, - val bufferSize: Int = ConsumerConfig.SocketBufferSize, - val fetchSize: StreamFetchSizes = new StreamFetchSizes, - val consumerMinSize:Int = ConsumerConfig.MinFetchBytes, - val consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs, - offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging { - - /** - * How long should the fetcher thread sleep before checking if any TopicPartitions has been added to its purview - */ - val sleepMSWhileNoTopicPartitions = 100 - - /** What's the next offset for a particular partition? **/ - val nextOffsets:concurrent.Map[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]().asScala - - /** Block on the first call to get message if the fetcher has not yet returned its initial results **/ - // TODO: It should be sufficient to just use the count down latch and await on it for each of the calls, but - // VisualVM was showing the consumer thread spending all its time in the await method rather than returning - // immediately, even though the process was proceeding normally. Hence the extra boolean. Should be investigated. - val firstCallBarrier = new CountDownLatch(1) - var firstCall = true - - var simpleConsumer = createSimpleConsumer() - - metrics.registerBrokerProxy(host, port) - - def createSimpleConsumer() = { - val hostString = "%s:%d" format (host, port) - info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system)) - - val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID, fetchSize, consumerMinSize, consumerMaxWait) - sc - } - - def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = { - debug("Adding new topic and partition %s to queue for %s" format (tp, host)) - - if (nextOffsets.asJava.containsKey(tp)) { - toss("Already consuming TopicPartition %s" format tp) - } - - val offset = if (nextOffset.isDefined && offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) { - nextOffset - .get - .toLong - } else { - warn("It appears that we received an invalid or empty offset %s for %s. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues." format (nextOffset, tp)) - - offsetGetter.getResetOffset(simpleConsumer, tp) - } - - debug("Got offset %s for new topic and partition %s." format (offset, tp)) - - nextOffsets += tp -> offset - - metrics.topicPartitions.get((host, port)).set(nextOffsets.size) - } - - def removeTopicPartition(tp: TopicAndPartition) = { - if (nextOffsets.asJava.containsKey(tp)) { - val offset = nextOffsets.remove(tp) - metrics.topicPartitions.get((host, port)).set(nextOffsets.size) - debug("Removed %s" format tp) - offset - } else { - warn("Asked to remove topic and partition %s, but not in map (keys = %s)" format (tp, nextOffsets.keys.mkString(","))) - None - } - } - - val thread = new Thread(new Runnable { - def run { - var reconnect = false - - try { - (new ExponentialSleepStrategy).run( - loop => { - if (reconnect) { - metrics.reconnects.get((host, port)).inc - simpleConsumer.close() - simpleConsumer = createSimpleConsumer() - } - - while (!Thread.currentThread.isInterrupted) { - messageSink.refreshDropped - if (nextOffsets.size == 0) { - debug("No TopicPartitions to fetch. Sleeping.") - Thread.sleep(sleepMSWhileNoTopicPartitions) - } else { - fetchMessages - - // If we got here, fetchMessages didn't throw an exception, i.e. it was successful. - // In that case, reset the loop delay, so that the next time an error occurs, - // we start with a short retry delay. - loop.reset - } - } - }, - - (exception, loop) => { - warn("Restarting consumer due to %s. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace." format exception) - debug("Exception detail:", exception) - abdicateAll - reconnect = true - }) - } catch { - case e: InterruptedException => info("Got interrupt exception in broker proxy thread.") - case e: ClosedByInterruptException => info("Got closed by interrupt exception in broker proxy thread.") - case e: OutOfMemoryError => throw new SamzaException("Got out of memory error in broker proxy thread.") - case e: StackOverflowError => throw new SamzaException("Got stack overflow error in broker proxy thread.") - } - - if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt.") - } - }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID)) - - private def fetchMessages(): Unit = { - val topicAndPartitionsToFetch = nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList - - if (topicAndPartitionsToFetch.size > 0) { - metrics.brokerReads.get((host, port)).inc - val response: FetchResponse = simpleConsumer.defaultFetch(topicAndPartitionsToFetch: _*) - firstCall = false - firstCallBarrier.countDown() - - // Split response into errors and non errors, processing the errors first - val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error.code() == ErrorMapping.NoError) - - handleErrors(errorResponses, response) - - nonErrorResponses.foreach { case (tp, data) => moveMessagesToTheirQueue(tp, data) } - } else { - refreshLatencyMetrics - - debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions)) - - metrics.brokerSkippedFetchRequests.get((host, port)).inc - - Thread.sleep(sleepMSWhileNoTopicPartitions) - } - } - - /** - * Releases ownership for a single TopicAndPartition. The - * KafkaSystemConsumer will try and find a new broker for the - * TopicAndPartition. - */ - def abdicate(tp: TopicAndPartition) = removeTopicPartition(tp) match { - // Need to be mindful of a tp that was removed by another thread - case Some(offset) => messageSink.abdicate(tp, offset) - case None => warn("Tried to abdicate for topic partition not in map. Removed in interim?") - } - - /** - * Releases all TopicAndPartition ownership for this BrokerProxy thread. The - * KafkaSystemConsumer will try and find a new broker for the - * TopicAndPartition. - */ - def abdicateAll { - info("Abdicating all topic partitions.") - val immutableNextOffsetsCopy = nextOffsets.toMap - immutableNextOffsetsCopy.keySet.foreach(abdicate(_)) - } - - def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response: FetchResponse) = { - // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves - case class Error(tp: TopicAndPartition, code: Short, exception: Exception) - - // Now subdivide the errors into three types: non-recoverable, not leader (== abdicate) and offset out of range (== get new offset) - - // Convert FetchResponse into easier-to-work-with Errors - val errors = for ( - (topicAndPartition, responseData) <- errorResponses; - error <- Option(response.error(topicAndPartition.topic, topicAndPartition.partition)) // Scala's being cranky about referring to error.getKey values... - ) yield new Error(topicAndPartition, error.code(), error.exception()) - - val (notLeaderOrUnknownTopic, otherErrors) = errors.partition { case (e) => e.code == ErrorMapping.NotLeaderForPartitionCode || e.code == ErrorMapping.UnknownTopicOrPartitionCode } - val (offsetOutOfRangeErrors, remainingErrors) = otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode) - - // Can recover from two types of errors: not leader (go find the new leader) and offset out of range (go get the new offset) - // However, we want to bail as quickly as possible if there are non recoverable errors so that the state of the other - // topic-partitions remains the same. That way, when we've rebuilt the simple consumer, we can come around and - // handle the recoverable errors. - remainingErrors.foreach(e => { - warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(",")) - KafkaUtil.maybeThrowException(e.exception) }) - - notLeaderOrUnknownTopic.foreach(e => { - warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp)) - abdicate(e.tp) - }) - - offsetOutOfRangeErrors.foreach(e => { - warn("Received OffsetOutOfRange exception for %s. Current offset = %s" format (e.tp, nextOffsets.getOrElse(e.tp, "not found in map, likely removed in the interim"))) - - try { - val newOffset = offsetGetter.getResetOffset(simpleConsumer, e.tp) - // Put the new offset into the map (if the tp still exists). Will catch it on the next go-around - nextOffsets.replace(e.tp, newOffset) - } catch { - // UnknownTopic or NotLeader are routine events and handled via abdication. All others, bail. - case _ @ (_:UnknownTopicOrPartitionException | _: NotLeaderForPartitionException) => warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp)) - abdicate(e.tp) - } - }) - } - - def moveMessagesToTheirQueue(tp: TopicAndPartition, data: FetchResponsePartitionData) = { - val messageSet: MessageSet = data.messages - var nextOffset = nextOffsets(tp) - - messageSink.setIsAtHighWatermark(tp, data.hw == 0 || data.hw == nextOffset) - require(messageSet != null) - for (message <- messageSet.iterator) { - messageSink.addMessage(tp, message, data.hw) // TODO: Verify this is correct - - nextOffset = message.nextOffset - - val bytesSize = message.message.payloadSize + message.message.keySize - metrics.reads.get(tp).inc - metrics.bytesRead.get(tp).inc(bytesSize) - metrics.brokerBytesRead.get((host, port)).inc(bytesSize) - metrics.offsets.get(tp).set(nextOffset) - } - - nextOffsets.replace(tp, nextOffset) // use replace rather than put in case this tp was removed while we were fetching. - - // Update high water mark - val hw = data.hw - if (hw >= 0) { - metrics.highWatermark.get(tp).set(hw) - metrics.lag.get(tp).set(hw - nextOffset) - } else { - debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp)) - } - } - override def toString() = "BrokerProxy for %s:%d" format (host, port) - - def start { - if (!thread.isAlive) { - info("Starting " + toString) - thread.setDaemon(true) - thread.setName("Samza BrokerProxy " + thread.getName) - thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler { - override def uncaughtException(t: Thread, e: Throwable) = error("Uncaught exception in broker proxy:", e) - }) - thread.start - } else { - debug("Tried to start an already started broker proxy (%s). Ignoring." format toString) - } - } - - def stop { - info("Shutting down " + toString) - - if (simpleConsumer != null) { - info("closing simple consumer...") - simpleConsumer.close - } - - thread.interrupt - thread.join - } - - private def refreshLatencyMetrics { - nextOffsets.foreach{ - case (topicAndPartition, offset) => { - val latestOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, -1, Request.OrdinaryConsumerId) - trace("latest offset of %s is %s" format (topicAndPartition, latestOffset)) - if (latestOffset >= 0) { - // only update the registered topicAndpartitions - if(metrics.highWatermark.containsKey(topicAndPartition)) { - metrics.highWatermark.get(topicAndPartition).set(latestOffset) - } - if(metrics.lag.containsKey(topicAndPartition)) { - metrics.lag.get(topicAndPartition).set(latestOffset - offset) - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java new file mode 100644 index 0000000..04071c1 --- /dev/null +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -0,0 +1,423 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.system.kafka; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import kafka.common.TopicAndPartition; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class contains a separate thread that reads messages from kafka and puts them into the BlockingEnvelopeMap + * through KafkaSystemConsumer.KafkaConsumerMessageSink object. + * This class is not thread safe. There will be only one instance of this class per KafkaSystemConsumer object. + * We still need some synchronization around kafkaConsumer. See pollConsumer() method for details. + */ +class KafkaConsumerProxy<K, V> { + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProxy.class); + + private static final int SLEEP_MS_WHILE_NO_TOPIC_PARTITION = 100; + + final Thread consumerPollThread; + private final Consumer<K, V> kafkaConsumer; + private final KafkaSystemConsumer.KafkaConsumerMessageSink sink; + private final KafkaSystemConsumerMetrics kafkaConsumerMetrics; + private final String metricName; + private final String systemName; + private final String clientId; + private final Map<TopicPartition, SystemStreamPartition> topicPartitionToSSP = new HashMap<>(); + private final Map<SystemStreamPartition, MetricName> perPartitionMetrics = new HashMap<>(); + // list of all the SSPs we poll from, with their next(most recently read + 1) offsets correspondingly. + private final Map<SystemStreamPartition, Long> nextOffsets = new ConcurrentHashMap<>(); + // lags behind the high water mark, as reported by the Kafka consumer. + private final Map<SystemStreamPartition, Long> latestLags = new HashMap<>(); + + private volatile boolean isRunning = false; + private volatile Throwable failureCause = null; + private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1); + + KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId, + KafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics, + String metricName) { + + this.kafkaConsumer = kafkaConsumer; + this.systemName = systemName; + this.sink = messageSink; + this.kafkaConsumerMetrics = samzaConsumerMetrics; + this.metricName = metricName; + this.clientId = clientId; + + this.kafkaConsumerMetrics.registerClientProxy(metricName); + + consumerPollThread = new Thread(createProxyThreadRunnable()); + consumerPollThread.setDaemon(true); + consumerPollThread.setName( + "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName); + } + + /** + * Add new partition to the list of polled partitions. + * Bust only be called before {@link KafkaConsumerProxy#start} is called.. + */ + public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) { + LOG.info(String.format("Adding new topicPartition %s with offset %s to queue for consumer %s", ssp, nextOffset, + this)); + topicPartitionToSSP.put(KafkaSystemConsumer.toTopicPartition(ssp), ssp); //registered SSPs + + // this is already vetted offset so there is no need to validate it + nextOffsets.put(ssp, nextOffset); + + kafkaConsumerMetrics.setNumTopicPartitions(metricName, nextOffsets.size()); + } + + /** + * Stop this KafkaConsumerProxy and wait for at most {@code timeoutMs}. + * @param timeoutMs maximum time to wait to stop this KafkaConsumerProxy + */ + public void stop(long timeoutMs) { + LOG.info("Shutting down KafkaConsumerProxy poll thread {} for {}", consumerPollThread.getName(), this); + + isRunning = false; + try { + consumerPollThread.join(timeoutMs/2); + // join() may timeout + // in this case we should interrupt it and wait again + if (consumerPollThread.isAlive()) { + consumerPollThread.interrupt(); + consumerPollThread.join(timeoutMs/2); + } + } catch (InterruptedException e) { + LOG.warn("Join in KafkaConsumerProxy has failed", e); + consumerPollThread.interrupt(); + } + } + + public void start() { + if (!consumerPollThread.isAlive()) { + LOG.info("Starting KafkaConsumerProxy polling thread for " + this.toString()); + + consumerPollThread.start(); + + // we need to wait until the thread starts + while (!isRunning && failureCause == null) { + try { + consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.info("Ignoring InterruptedException while waiting for consumer poll thread to start.", e); + } + } + } else { + LOG.warn("Tried to start an already started KafkaConsumerProxy (%s). Ignoring.", this.toString()); + } + + if (topicPartitionToSSP.size() == 0) { + String msg = String.format("Cannot start KafkaConsumerProxy without any registered TopicPartitions for %s", systemName); + LOG.error(msg); + throw new SamzaException(msg); + } + } + + boolean isRunning() { + return isRunning; + } + + Throwable getFailureCause() { + return failureCause; + } + + private void initializeLags() { + // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag. + Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitionToSSP.keySet()); + endOffsets.forEach((tp, offset) -> { + SystemStreamPartition ssp = topicPartitionToSSP.get(tp); + long startingOffset = nextOffsets.get(ssp); + // End offsets are the offset of the newest message + 1 + // If the message we are about to consume is < end offset, we are starting with a lag. + long initialLag = endOffsets.get(tp) - startingOffset; + + LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset); + latestLags.put(ssp, initialLag); + sink.setIsAtHighWatermark(ssp, initialLag == 0); + }); + + // initialize lag metrics + refreshLagMetrics(); + } + + // creates a separate thread for getting the messages. + private Runnable createProxyThreadRunnable() { + Runnable runnable = () -> { + isRunning = true; + + try { + consumerPollThreadStartLatch.countDown(); + LOG.info("Starting consumer poll thread {} for system {}", consumerPollThread.getName(), systemName); + initializeLags(); + while (isRunning) { + fetchMessages(); + } + } catch (Throwable throwable) { + LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable); + // KafkaSystemConsumer uses the failureCause to propagate the throwable to the container + failureCause = throwable; + isRunning = false; + } + + if (!isRunning) { + LOG.info("KafkaConsumerProxy for system {} has stopped.", systemName); + } + }; + + return runnable; + } + + private void fetchMessages() { + Set<SystemStreamPartition> sspsToFetch = new HashSet<>(); + for (SystemStreamPartition ssp : nextOffsets.keySet()) { + if (sink.needsMoreMessages(ssp)) { + sspsToFetch.add(ssp); + } + } + LOG.debug("pollConsumer for {} SSPs: {}", sspsToFetch.size(), sspsToFetch); + if (!sspsToFetch.isEmpty()) { + kafkaConsumerMetrics.incClientReads(metricName); + + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response; + + response = pollConsumer(sspsToFetch, 500L); + + // move the responses into the queue + for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) { + List<IncomingMessageEnvelope> envelopes = e.getValue(); + if (envelopes != null) { + moveMessagesToTheirQueue(e.getKey(), envelopes); + } + } + + populateCurrentLags(sspsToFetch); // find current lags for for each SSP + } else { // nothing to read + + LOG.debug("No topic/partitions need to be fetched for system {} right now. Sleeping {}ms.", systemName, + SLEEP_MS_WHILE_NO_TOPIC_PARTITION); + + kafkaConsumerMetrics.incClientSkippedFetchRequests(metricName); + + try { + Thread.sleep(SLEEP_MS_WHILE_NO_TOPIC_PARTITION); + } catch (InterruptedException e) { + LOG.warn("Sleep in fetchMessages was interrupted"); + } + } + refreshLagMetrics(); + } + + // the actual polling of the messages from kafka + private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollConsumer( + Set<SystemStreamPartition> systemStreamPartitions, long timeoutMs) { + + // Since we need to poll only from some subset of TopicPartitions (passed as the argument), + // we need to pause the rest. + List<TopicPartition> topicPartitionsToPause = new ArrayList<>(); + List<TopicPartition> topicPartitionsToPoll = new ArrayList<>(); + + for (Map.Entry<TopicPartition, SystemStreamPartition> e : topicPartitionToSSP.entrySet()) { + TopicPartition tp = e.getKey(); + SystemStreamPartition ssp = e.getValue(); + if (systemStreamPartitions.contains(ssp)) { + topicPartitionsToPoll.add(tp); // consume + } else { + topicPartitionsToPause.add(tp); // ignore + } + } + + ConsumerRecords<K, V> records; + try { + // Synchronize, in case the consumer is used in some other thread (metadata or something else) + synchronized (kafkaConsumer) { + // Since we are not polling from ALL the subscribed topics, so we need to "change" the subscription temporarily + kafkaConsumer.pause(topicPartitionsToPause); + kafkaConsumer.resume(topicPartitionsToPoll); + records = kafkaConsumer.poll(timeoutMs); + } + } catch (Exception e) { + // we may get InvalidOffsetException | AuthorizationException | KafkaException exceptions, + // but we still just rethrow, and log it up the stack. + LOG.error("Caught a Kafka exception in pollConsumer for system " + systemName, e); + throw e; + } + + return processResults(records); + } + + private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> processResults(ConsumerRecords<K, V> records) { + if (records == null) { + throw new SamzaException("Received null 'records' after polling consumer in KafkaConsumerProxy " + this); + } + + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new HashMap<>(records.count()); + // Parse the returned records and convert them into the IncomingMessageEnvelope. + for (ConsumerRecord<K, V> record : records) { + int partition = record.partition(); + String topic = record.topic(); + TopicPartition tp = new TopicPartition(topic, partition); + + updateMetrics(record, tp); + + SystemStreamPartition ssp = topicPartitionToSSP.get(tp); + List<IncomingMessageEnvelope> messages = results.get(ssp); + if (messages == null) { + messages = new ArrayList<>(); + results.put(ssp, messages); + } + + K key = record.key(); + Object value = record.value(); + IncomingMessageEnvelope imEnvelope = + new IncomingMessageEnvelope(ssp, String.valueOf(record.offset()), key, value, getRecordSize(record)); + messages.add(imEnvelope); + } + if (LOG.isDebugEnabled()) { + LOG.debug("# records per SSP:"); + for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : results.entrySet()) { + List<IncomingMessageEnvelope> list = e.getValue(); + LOG.debug(e.getKey() + " = " + ((list == null) ? 0 : list.size())); + } + } + + return results; + } + + private int getRecordSize(ConsumerRecord<K, V> r) { + int keySize = (r.key() == null) ? 0 : r.serializedKeySize(); + return keySize + r.serializedValueSize(); + } + + private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) { + TopicAndPartition tap = KafkaSystemConsumer.toTopicAndPartition(tp); + SystemStreamPartition ssp = new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition())); + + Long lag = latestLags.get(ssp); + if (lag == null) { + throw new SamzaException("Unknown/unregistered ssp in latestLags. ssp=" + ssp + "; system=" + systemName); + } + long currentSSPLag = lag.longValue(); // lag between the current offset and the highwatermark + if (currentSSPLag < 0) { + return; + } + + long recordOffset = r.offset(); + long highWatermark = recordOffset + currentSSPLag; // derived value for the highwatermark + + int size = getRecordSize(r); + kafkaConsumerMetrics.incReads(tap); + kafkaConsumerMetrics.incBytesReads(tap, size); + kafkaConsumerMetrics.setOffsets(tap, recordOffset); + kafkaConsumerMetrics.incClientBytesReads(metricName, size); + kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark); + } + + private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List<IncomingMessageEnvelope> envelopes) { + long nextOffset = nextOffsets.get(ssp); + + for (IncomingMessageEnvelope env : envelopes) { + sink.addMessage(ssp, env); // move message to the BlockingEnvelopeMap's queue + + LOG.trace("IncomingMessageEnvelope. got envelope with offset:{} for ssp={}", env.getOffset(), ssp); + nextOffset = Long.valueOf(env.getOffset()) + 1; + } + + nextOffsets.put(ssp, nextOffset); + } + + // The only way to figure out lag for the KafkaConsumer is to look at the metrics after each poll() call. + // One of the metrics (records-lag) shows how far behind the HighWatermark the consumer is. + // This method populates the lag information for each SSP into latestLags member variable. + private void populateCurrentLags(Set<SystemStreamPartition> ssps) { + + Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics(); + + // populate the MetricNames first time + if (perPartitionMetrics.isEmpty()) { + HashMap<String, String> tags = new HashMap<>(); + tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics + + for (SystemStreamPartition ssp : ssps) { + TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp); + perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)); + } + } + + for (SystemStreamPartition ssp : ssps) { + MetricName mn = perPartitionMetrics.get(ssp); + Metric currentLagMetric = consumerMetrics.get(mn); + + // High watermark is fixed to be the offset of last available message, + // so the lag is now at least 0, which is the same as Samza's definition. + // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling. + long currentLag = (currentLagMetric != null) ? (long) currentLagMetric.value() : -1L; + latestLags.put(ssp, currentLag); + + // calls the setIsAtHead for the BlockingEnvelopeMap + sink.setIsAtHighWatermark(ssp, currentLag == 0); + } + } + + private void refreshLagMetrics() { + for (Map.Entry<SystemStreamPartition, Long> e : nextOffsets.entrySet()) { + SystemStreamPartition ssp = e.getKey(); + Long offset = e.getValue(); + TopicAndPartition tp = new TopicAndPartition(ssp.getStream(), ssp.getPartition().getPartitionId()); + Long lag = latestLags.get(ssp); + LOG.trace("Latest offset of {} is {}; lag = {}", ssp, offset, lag); + if (lag != null && offset != null && lag >= 0) { + long streamEndOffset = offset.longValue() + lag.longValue(); + // update the metrics + kafkaConsumerMetrics.setHighWatermarkValue(tp, streamEndOffset); + kafkaConsumerMetrics.setLagValue(tp, lag.longValue()); + } + } + } + + @Override + public String toString() { + return String.format("consumerProxy-%s-%s", systemName, clientId); + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java new file mode 100644 index 0000000..10ce274 --- /dev/null +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java @@ -0,0 +1,371 @@ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.system.kafka; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import kafka.common.TopicAndPartition; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.config.KafkaConsumerConfig; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.BlockingEnvelopeMap; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + + +public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements SystemConsumer { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemConsumer.class); + + private static final long FETCH_THRESHOLD = 50000; + private static final long FETCH_THRESHOLD_BYTES = -1L; + + private final Consumer<K, V> kafkaConsumer; + private final String systemName; + private final String clientId; + private final AtomicBoolean stopped = new AtomicBoolean(false); + private final AtomicBoolean started = new AtomicBoolean(false); + private final Config config; + private final boolean fetchThresholdBytesEnabled; + private final KafkaSystemConsumerMetrics metrics; + + // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap. + final KafkaConsumerMessageSink messageSink; + + // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates + // BlockingEnvelopMap's buffers. + final private KafkaConsumerProxy proxy; + + // keep registration data until the start - mapping between registered SSPs and topicPartitions, and their offsets + final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>(); + final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>(); + + long perPartitionFetchThreshold; + long perPartitionFetchThresholdBytes; + + /** + * Create a KafkaSystemConsumer for the provided {@code systemName} + * @param systemName system name for which we create the consumer + * @param config application config + * @param metrics metrics for this KafkaSystemConsumer + * @param clock system clock + */ + public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId, + KafkaSystemConsumerMetrics metrics, Clock clock) { + + super(metrics.registry(), clock, metrics.getClass().getName()); + + this.kafkaConsumer = kafkaConsumer; + this.clientId = clientId; + this.systemName = systemName; + this.config = config; + this.metrics = metrics; + + fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName); + + // create a sink for passing the messages between the proxy and the consumer + messageSink = new KafkaConsumerMessageSink(); + + // Create the proxy to do the actual message reading. + String metricName = String.format("%s", systemName); + proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, metrics, metricName); + LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy); + } + + /** + * Create internal kafka consumer object, which will be used in the Proxy. + * @param systemName system name for which we create the consumer + * @param clientId client id to use int the kafka client + * @param config config + * @return kafka consumer object + */ + public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String systemName, String clientId, Config config) { + + // extract kafka client configs + KafkaConsumerConfig consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId); + + LOG.info("{}: KafkaClient properties {}", systemName, consumerConfig); + + return new KafkaConsumer(consumerConfig); + } + + @Override + public void start() { + if (!started.compareAndSet(false, true)) { + LOG.warn("{}: Attempting to start the consumer for the second (or more) time.", this); + return; + } + if (stopped.get()) { + LOG.error("{}: Attempting to start a stopped consumer", this); + return; + } + // initialize the subscriptions for all the registered TopicPartitions + startSubscription(); + // needs to be called after all the registrations are completed + setFetchThresholds(); + + startConsumer(); + LOG.info("{}: Consumer started", this); + } + + private void startSubscription() { + //subscribe to all the registered TopicPartitions + LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToSSP.keySet()); + try { + synchronized (kafkaConsumer) { + // we are using assign (and not subscribe), so we need to specify both topic and partition + kafkaConsumer.assign(topicPartitionsToSSP.keySet()); + } + } catch (Exception e) { + throw new SamzaException("Consumer subscription failed for " + this, e); + } + } + + /** + * Set the offsets to start from. + * Register the TopicPartitions with the proxy. + * Start the proxy. + */ + void startConsumer() { + // set the offset for each TopicPartition + if (topicPartitionsToOffset.size() <= 0) { + LOG.error ("{}: Consumer is not subscribed to any SSPs", this); + } + + topicPartitionsToOffset.forEach((tp, startingOffsetString) -> { + long startingOffset = Long.valueOf(startingOffsetString); + + try { + synchronized (kafkaConsumer) { + kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value + } + } catch (Exception e) { + // all recoverable execptions are handled by the client. + // if we get here there is nothing left to do but bail out. + String msg = + String.format("%s: Got Exception while seeking to %s for partition %s", this, startingOffsetString, tp); + LOG.error(msg, e); + throw new SamzaException(msg, e); + } + + LOG.info("{}: Changing consumer's starting offset for tp = %s to %s", this, tp, startingOffsetString); + + // add the partition to the proxy + proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset); + }); + + // start the proxy thread + if (proxy != null && !proxy.isRunning()) { + LOG.info("{}: Starting proxy {}", this, proxy); + proxy.start(); + } + } + + private void setFetchThresholds() { + // get the thresholds, and set defaults if not defined. + KafkaConfig kafkaConfig = new KafkaConfig(config); + + Option<String> fetchThresholdOption = kafkaConfig.getConsumerFetchThreshold(systemName); + long fetchThreshold = FETCH_THRESHOLD; + if (fetchThresholdOption.isDefined()) { + fetchThreshold = Long.valueOf(fetchThresholdOption.get()); + } + + Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName); + long fetchThresholdBytes = FETCH_THRESHOLD_BYTES; + if (fetchThresholdBytesOption.isDefined()) { + fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get()); + } + + int numPartitions = topicPartitionsToSSP.size(); + if (numPartitions != topicPartitionsToOffset.size()) { + throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()"); + } + + + if (numPartitions > 0) { + perPartitionFetchThreshold = fetchThreshold / numPartitions; + if (fetchThresholdBytesEnabled) { + // currently this feature cannot be enabled, because we do not have the size of the messages available. + // messages get double buffered, hence divide by 2 + perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numPartitions; + } + } + LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; numPartitions={}, perPartitionFetchThreshold={}, perPartitionFetchThresholdBytes(0 if disabled)={}", + this, fetchThresholdBytes, fetchThreshold, numPartitions, perPartitionFetchThreshold, perPartitionFetchThresholdBytes); + } + + @Override + public void stop() { + if (!stopped.compareAndSet(false, true)) { + LOG.warn("{}: Attempting to stop stopped consumer.", this); + return; + } + + LOG.info("{}: Stopping Samza kafkaConsumer ", this); + + // stop the proxy (with 1 minute timeout) + if (proxy != null) { + LOG.info("{}: Stopping proxy {}", this, proxy); + proxy.stop(TimeUnit.SECONDS.toMillis(60)); + } + + try { + synchronized (kafkaConsumer) { + LOG.info("{}: Closing kafkaSystemConsumer {}", this, kafkaConsumer); + kafkaConsumer.close(); + } + } catch (Exception e) { + LOG.warn("{}: Failed to stop KafkaSystemConsumer.", this, e); + } + } + + /** + * record the ssp and the offset. Do not submit it to the consumer yet. + * @param systemStreamPartition ssp to register + * @param offset offset to register with + */ + @Override + public void register(SystemStreamPartition systemStreamPartition, String offset) { + if (started.get()) { + String msg = String.format("%s: Trying to register partition after consumer has been started. ssp=%s", this, + systemStreamPartition); + throw new SamzaException(msg); + } + + if (!systemStreamPartition.getSystem().equals(systemName)) { + LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition); + return; + } + LOG.info("{}: Registering ssp = {} with offset {}", this, systemStreamPartition, offset); + + super.register(systemStreamPartition, offset); + + TopicPartition tp = toTopicPartition(systemStreamPartition); + + topicPartitionsToSSP.put(tp, systemStreamPartition); + + String existingOffset = topicPartitionsToOffset.get(tp); + // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages. + if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) { + topicPartitionsToOffset.put(tp, offset); + } + + metrics.registerTopicAndPartition(toTopicAndPartition(tp)); + } + + /** + * Compare two String offsets. + * Note. There is a method in KafkaSystemAdmin that does that, but that would require instantiation of systemadmin for each consumer. + * @return see {@link Long#compareTo(Long)} + */ + private static int compareOffsets(String offset1, String offset2) { + return Long.valueOf(offset1).compareTo(Long.valueOf(offset2)); + } + + @Override + public String toString() { + return String.format("%s:%s", systemName, clientId); + } + + @Override + public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll( + Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { + + // check if the proxy is running + if (!proxy.isRunning()) { + stop(); + String message = String.format("%s: KafkaConsumerProxy has stopped.", this); + throw new SamzaException(message, proxy.getFailureCause()); + } + + return super.poll(systemStreamPartitions, timeout); + } + + /** + * convert from TopicPartition to TopicAndPartition + */ + public static TopicAndPartition toTopicAndPartition(TopicPartition tp) { + return new TopicAndPartition(tp.topic(), tp.partition()); + } + + /** + * convert to TopicPartition from SystemStreamPartition + */ + public static TopicPartition toTopicPartition(SystemStreamPartition ssp) { + return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId()); + } + + /** + * return system name for this consumer + * @return system name + */ + public String getSystemName() { + return systemName; + } + + public class KafkaConsumerMessageSink { + + public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) { + setIsAtHead(ssp, isAtHighWatermark); + } + + boolean needsMoreMessages(SystemStreamPartition ssp) { + LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};" + + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, fetchThresholdBytesEnabled, + getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp), + perPartitionFetchThreshold); + + if (fetchThresholdBytesEnabled) { + return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes; + } else { + return getNumMessagesInQueue(ssp) < perPartitionFetchThreshold; + } + } + + void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) { + LOG.trace("{}: Incoming message ssp = {}: envelope = {}.", this, ssp, envelope); + + try { + put(ssp, envelope); + } catch (InterruptedException e) { + throw new SamzaException( + String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s", this, + envelope.getOffset(), ssp)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala deleted file mode 100644 index fd84c4a..0000000 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ /dev/null @@ -1,309 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.system.kafka - -import kafka.common.TopicAndPartition -import org.apache.samza.util.Logging -import kafka.message.Message -import kafka.message.MessageAndOffset -import org.apache.samza.Partition -import org.apache.kafka.common.utils.Utils -import org.apache.samza.util.Clock -import kafka.serializer.DefaultDecoder -import kafka.serializer.Decoder -import org.apache.samza.util.BlockingEnvelopeMap -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.system.IncomingMessageEnvelope -import kafka.consumer.ConsumerConfig -import org.apache.samza.util.TopicMetadataStore -import kafka.api.PartitionMetadata -import kafka.api.TopicMetadata -import org.apache.samza.util.ExponentialSleepStrategy -import java.util.concurrent.ConcurrentHashMap -import scala.collection.JavaConverters._ -import org.apache.samza.system.SystemAdmin - -object KafkaSystemConsumer { - - // Approximate additional shallow heap overhead per message in addition to the raw bytes - // received from Kafka 4 + 64 + 4 + 4 + 4 = 80 bytes overhead. - // As this overhead is a moving target, and not very large - // compared to the message size its being ignore in the computation for now. - val MESSAGE_SIZE_OVERHEAD = 4 + 64 + 4 + 4 + 4; - - def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = { - val topic = systemStreamPartition.getStream - val partitionId = systemStreamPartition.getPartition.getPartitionId - TopicAndPartition(topic, partitionId) - } -} - -/** - * Maintain a cache of BrokerProxies, returning the appropriate one for the - * requested topic and partition. - */ -private[kafka] class KafkaSystemConsumer( - systemName: String, - systemAdmin: SystemAdmin, - metrics: KafkaSystemConsumerMetrics, - metadataStore: TopicMetadataStore, - clientId: String, - timeout: Int = ConsumerConfig.ConsumerTimeoutMs, - bufferSize: Int = ConsumerConfig.SocketBufferSize, - fetchSize: StreamFetchSizes = new StreamFetchSizes, - consumerMinSize: Int = ConsumerConfig.MinFetchBytes, - consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs, - - /** - * Defines a low water mark for how many messages we buffer before we start - * executing fetch requests against brokers to get more messages. This value - * is divided equally among all registered SystemStreamPartitions. For - * example, if fetchThreshold is set to 50000, and there are 50 - * SystemStreamPartitions registered, then the per-partition threshold is - * 1000. As soon as a SystemStreamPartition's buffered message count drops - * below 1000, a fetch request will be executed to get more data for it. - * - * Increasing this parameter will decrease the latency between when a queue - * is drained of messages and when new messages are enqueued, but also leads - * to an increase in memory usage since more messages will be held in memory. - */ - fetchThreshold: Int = 50000, - /** - * Defines a low water mark for how many bytes we buffer before we start - * executing fetch requests against brokers to get more messages. This - * value is divided by 2 because the messages are buffered twice, once in - * KafkaConsumer and then in SystemConsumers. This value - * is divided equally among all registered SystemStreamPartitions. - * However this is a soft limit per partition, as the - * bytes are cached at the message boundaries, and the actual usage can be - * 1000 bytes + size of max message in the partition for a given stream. - * The bytes if the size of the bytebuffer in Message. Hence, the - * Object overhead is not taken into consideration. In this codebase - * it seems to be quite small. Hence, even for 500000 messages this is around 4MB x 2 = 8MB, - * which is not considerable. - * - * For example, - * if fetchThresholdBytes is set to 100000 bytes, and there are 50 - * SystemStreamPartitions registered, then the per-partition threshold is - * (100000 / 2) / 50 = 1000 bytes. - * As this is a soft limit, the actual usage can be 1000 bytes + size of max message. - * As soon as a SystemStreamPartition's buffered messages bytes drops - * below 1000, a fetch request will be executed to get more data for it. - * - * Increasing this parameter will decrease the latency between when a queue - * is drained of messages and when new messages are enqueued, but also leads - * to an increase in memory usage since more messages will be held in memory. - * - * The default value is -1, which means this is not used. When the value - * is > 0, then the fetchThreshold which is count based is ignored. - */ - fetchThresholdBytes: Long = -1, - /** - * if(fetchThresholdBytes > 0) true else false - */ - fetchLimitByBytesEnabled: Boolean = false, - offsetGetter: GetOffset = new GetOffset("fail"), - deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]], - keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]], - retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, - clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap( - metrics.registry, - new Clock { - def currentTimeMillis = clock() - }, - classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging { - - type HostPort = (String, Int) - val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]() - val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]().asScala - var perPartitionFetchThreshold = fetchThreshold - var perPartitionFetchThresholdBytes = 0L - - def start() { - if (topicPartitionsAndOffsets.size > 0) { - perPartitionFetchThreshold = fetchThreshold / topicPartitionsAndOffsets.size - // messages get double buffered, hence divide by 2 - if(fetchLimitByBytesEnabled) { - perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / topicPartitionsAndOffsets.size - } - } - - systemAdmin.start() - refreshBrokers - } - - override def register(systemStreamPartition: SystemStreamPartition, offset: String) { - super.register(systemStreamPartition, offset) - - val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition) - val existingOffset = topicPartitionsAndOffsets.getOrElseUpdate(topicAndPartition, offset) - // register the older offset in the consumer - if (systemAdmin.offsetComparator(existingOffset, offset) >= 0) { - topicPartitionsAndOffsets.replace(topicAndPartition, offset) - } - - metrics.registerTopicAndPartition(KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)) - } - - def stop() { - systemAdmin.stop() - brokerProxies.values.foreach(_.stop) - } - - protected def createBrokerProxy(host: String, port: Int): BrokerProxy = { - info("Creating new broker proxy for host: %s and port: %s" format(host, port)) - new BrokerProxy(host, port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter) - } - - protected def getPartitionMetadata(topicMetadata: TopicMetadata, partition: Int): Option[PartitionMetadata] = { - topicMetadata.partitionsMetadata.find(_.partitionId == partition) - } - - protected def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = { - // Whatever we do, we can't say Broker, even though we're - // manipulating it here. Broker is a private type and Scala doesn't seem - // to care about that as long as you don't explicitly declare its type. - val brokerOption = partitionMetadata.flatMap(_.leader) - - brokerOption match { - case Some(broker) => Some(broker.host, broker.port) - case _ => None - } - } - - def refreshBrokers { - var tpToRefresh = topicPartitionsAndOffsets.keySet.toList - info("Refreshing brokers for: %s" format topicPartitionsAndOffsets) - retryBackoff.run( - loop => { - val topics = tpToRefresh.map(_.topic).toSet - val topicMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics)) - - // addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions. - // This avoids trying to re-add the same topic partition repeatedly - def refresh() = { - val head = tpToRefresh.head - // refreshBrokers can be called from abdicate and refreshDropped, - // both of which are triggered from BrokerProxy threads. To prevent - // accidentally creating multiple objects for the same broker, or - // accidentally not updating the topicPartitionsAndOffsets variable, - // we need to lock. - this.synchronized { - // Check if we still need this TopicAndPartition inside the - // critical section. If we don't, then notAValidEvent it. - topicPartitionsAndOffsets.get(head) match { - case Some(nextOffset) => - val partitionMetadata = getPartitionMetadata(topicMetadata(head.topic), head.partition) - getLeaderHostPort(partitionMetadata) match { - case Some((host, port)) => - debug("Got partition metadata for %s: %s" format(head, partitionMetadata.get)) - val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port)) - brokerProxy.addTopicPartition(head, Option(nextOffset)) - brokerProxy.start - debug("Claimed topic-partition (%s) for (%s)".format(head, brokerProxy)) - topicPartitionsAndOffsets -= head - case None => info("No metadata available for: %s. Will try to refresh and add to a consumer thread later." format head) - } - case _ => debug("Ignoring refresh for %s because we already added it from another thread." format head) - } - } - tpToRefresh.tail - } - - while (!tpToRefresh.isEmpty) { - tpToRefresh = refresh() - } - - loop.done - }, - - (exception, loop) => { - warn("While refreshing brokers for %s: %s. Retrying." format (tpToRefresh.head, exception)) - debug("Exception detail:", exception) - }) - } - - val sink = new MessageSink { - var lastDroppedRefresh = clock() - - def refreshDropped() { - if (topicPartitionsAndOffsets.size > 0 && clock() - lastDroppedRefresh > 10000) { - refreshBrokers - lastDroppedRefresh = clock() - } - } - - def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) { - setIsAtHead(toSystemStreamPartition(tp), isAtHighWatermark) - } - - def needsMoreMessages(tp: TopicAndPartition) = { - if(fetchLimitByBytesEnabled) { - getMessagesSizeInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThresholdBytes - } else { - getNumMessagesInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThreshold - } - } - - def getMessageSize(message: Message): Integer = { - message.size + KafkaSystemConsumer.MESSAGE_SIZE_OVERHEAD - } - - def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = { - trace("Incoming message %s: %s." format (tp, msg)) - - val systemStreamPartition = toSystemStreamPartition(tp) - val isAtHead = highWatermark == msg.offset - val offset = msg.offset.toString - val key = if (msg.message.key != null) { - keyDeserializer.fromBytes(Utils.readBytes(msg.message.key)) - } else { - null - } - val message = if (!msg.message.isNull) { - deserializer.fromBytes(Utils.readBytes(msg.message.payload)) - } else { - null - } - - if(fetchLimitByBytesEnabled ) { - val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message)) - ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L) - put(systemStreamPartition, ime) - } else { - val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message) - ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L) - put(systemStreamPartition, ime) - } - - setIsAtHead(systemStreamPartition, isAtHead) - } - - def abdicate(tp: TopicAndPartition, nextOffset: Long) { - info("Abdicating for %s" format (tp)) - topicPartitionsAndOffsets += tp -> nextOffset.toString - refreshBrokers - } - - private def toSystemStreamPartition(tp: TopicAndPartition) = { - new SystemStreamPartition(systemName, tp.topic, new Partition(tp.partition)) - } - } -}
