SAMZA-1868: Create new SamzaAmdmin for Kafka This Request is a copy of #647(got garbled). This PR already addresses all the comments brought up in the other request.
Author: Boris S <[email protected]> Author: Boris S <[email protected]> Author: Boris Shkolnik <[email protected]> Reviewers: Shanthoosh Venkatraman <[email protected]>, Prateek Maheshwari <[email protected]> Closes #662 from sborya/NewConsumerAdmin2 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/63d33fa0 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/63d33fa0 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/63d33fa0 Branch: refs/heads/master Commit: 63d33fa0617488d25a0d3fb061423271392d20f6 Parents: 3c78e06 Author: Boris S <[email protected]> Authored: Thu Oct 11 14:26:51 2018 -0700 Committer: Boris S <[email protected]> Committed: Thu Oct 11 14:26:51 2018 -0700 ---------------------------------------------------------------------- .../samza/application/ApplicationUtil.java | 1 - .../org/apache/samza/config/SystemConfig.scala | 2 +- .../samza/config/KafkaConsumerConfig.java | 194 ++++++ .../samza/system/kafka/KafkaSystemAdmin.java | 665 +++++++++++++++++++ .../samza/system/kafka/KafkaSystemConsumer.java | 366 ++++++++++ .../org/apache/samza/config/KafkaConfig.scala | 5 + .../samza/config/KafkaConsumerConfig.java | 201 ------ .../samza/system/kafka/KafkaConsumerProxy.java | 2 + .../samza/system/kafka/KafkaSystemAdmin.scala | 608 ----------------- .../kafka/KafkaSystemAdminUtilsScala.scala | 192 ++++++ .../samza/system/kafka/KafkaSystemConsumer.java | 371 ----------- .../samza/system/kafka/KafkaSystemFactory.scala | 63 +- .../scala/org/apache/samza/util/KafkaUtil.scala | 11 - .../samza/config/TestKafkaConsumerConfig.java | 60 +- .../system/kafka/TestKafkaSystemAdminJava.java | 185 ++++-- .../kafka/TestKafkaSystemAdminWithMock.java | 317 +++++++++ .../system/kafka/TestKafkaSystemConsumer.java | 225 +++++++ .../kafka/TestKafkaSystemConsumerMetrics.java | 109 +++ .../system/kafka/TestKafkaSystemAdmin.scala | 109 ++- .../system/kafka/TestKafkaSystemConsumer.java | 220 ------ .../operator/TestRepartitionJoinWindowApp.java | 18 +- .../AbstractIntegrationTestHarness.scala | 60 +- 22 files changed, 2366 insertions(+), 1618 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java index b39ad3c..f779619 100644 --- a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java +++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java @@ -59,5 +59,4 @@ public class ApplicationUtil { } return new LegacyTaskApplication(taskClassOption.get()); } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala index bebdbd8..00e65a7 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala @@ -50,7 +50,7 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getDefaultSystemOffset(systemName: String) = getOption(SystemConfig.CONSUMER_OFFSET_DEFAULT format (systemName)) - def deleteCommittedMessages(systemName: String) = getOption(SystemConfig.DELETE_COMMITTED_MESSAGES format (systemName)) + def deleteCommittedMessages(systemName: String) = getBoolean(SystemConfig.DELETE_COMMITTED_MESSAGES format (systemName), false) /** * Returns a list of all system names from the config file. Useful for http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..ad17e82 --- /dev/null +++ b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java @@ -0,0 +1,194 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.config; + +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.samza.SamzaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + + +/** + * The configuration class for KafkaConsumer + */ +public class KafkaConsumerConfig extends HashMap<String, Object> { + + public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class); + + public static final String ZOOKEEPER_CONNECT = "zookeeper.connect"; + + private final String systemName; + /* + * By default, KafkaConsumer will fetch some big number of available messages for all the partitions. + * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll(). + */ + static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100"; + + private KafkaConsumerConfig(Map<String, Object> props, String systemName) { + super(props); + this.systemName = systemName; + } + + /** + * Create kafka consumer configs, based on the subset of global configs. + * @param config application config + * @param systemName system name + * @param clientId client id provided by the caller + * @return KafkaConsumerConfig + */ + public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId) { + + Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true); + + final String groupId = createConsumerGroupId(config); + + Map<String, Object> consumerProps = new HashMap<>(); + consumerProps.putAll(subConf); + + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); + + // These are values we enforce in sazma, and they cannot be overwritten. + + // Disable consumer auto-commit because Samza controls commits + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + // Translate samza config value to kafka config value + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))); + + // if consumer bootstrap servers are not configured, get them from the producer configs + if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + String bootstrapServers = + config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + if (StringUtils.isEmpty(bootstrapServers)) { + throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config for " + systemName); + } + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + + // Always use default partition assignment strategy. Do not allow override. + consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); + + // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should + // default to byte[] + if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { + LOG.info("setting key serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + } + if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { + LOG.info("setting value serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + } + + // Override default max poll config if there is no value + consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS); + + return new KafkaConsumerConfig(consumerProps, systemName); + } + + public String getClientId() { + String clientId = (String) get(ConsumerConfig.CLIENT_ID_CONFIG); + if (StringUtils.isBlank(clientId)) { + throw new SamzaException("client Id is not set for consumer for system=" + systemName); + } + return clientId; + } + + // group id should be unique per job + static String createConsumerGroupId(Config config) { + Pair<String, String> jobNameId = getJobNameAndId(config); + + return String.format("%s-%s", jobNameId.getLeft(), jobNameId.getRight()); + } + + // client id should be unique per job + public static String createClientId(String prefix, Config config) { + + Pair<String, String> jobNameId = getJobNameAndId(config); + String jobName = jobNameId.getLeft(); + String jobId = jobNameId.getRight(); + return String.format("%s-%s-%s", prefix.replaceAll("\\W", "_"), jobName.replaceAll("\\W", "_"), + jobId.replaceAll("\\W", "_")); + } + + public static Pair<String, String> getJobNameAndId(Config config) { + JobConfig jobConfig = new JobConfig(config); + Option jobNameOption = jobConfig.getName(); + if (jobNameOption.isEmpty()) { + throw new ConfigException("Missing job name"); + } + String jobName = (String) jobNameOption.get(); + return new ImmutablePair<>(jobName, jobConfig.getJobId()); + } + + /** + * If settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset), + * then need to convert them (see kafka.apache.org/documentation): + * "largest" -> "latest" + * "smallest" -> "earliest" + * + * If no setting specified we return "latest" (same as Kafka). + * @param autoOffsetReset value from the app provided config + * @return String representing the config value for "auto.offset.reset" property + */ + static String getAutoOffsetResetValue(final String autoOffsetReset) { + final String SAMZA_OFFSET_LARGEST = "largest"; + final String SAMZA_OFFSET_SMALLEST = "smallest"; + final String KAFKA_OFFSET_LATEST = "latest"; + final String KAFKA_OFFSET_EARLIEST = "earliest"; + final String KAFKA_OFFSET_NONE = "none"; + + if (autoOffsetReset == null) { + return KAFKA_OFFSET_LATEST; // return default + } + + // accept kafka values directly + if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST) + || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) { + return autoOffsetReset; + } + + String newAutoOffsetReset; + switch (autoOffsetReset) { + case SAMZA_OFFSET_LARGEST: + newAutoOffsetReset = KAFKA_OFFSET_LATEST; + break; + case SAMZA_OFFSET_SMALLEST: + newAutoOffsetReset = KAFKA_OFFSET_EARLIEST; + break; + default: + newAutoOffsetReset = KAFKA_OFFSET_LATEST; + } + LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset); + return newAutoOffsetReset; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java new file mode 100644 index 0000000..cb2db10 --- /dev/null +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java @@ -0,0 +1,665 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kafka; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import kafka.admin.AdminClient; +import kafka.utils.ZkUtils; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.config.SystemConfig; +import org.apache.samza.system.ExtendedSystemAdmin; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.system.StreamValidationException; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.ExponentialSleepStrategy; +import org.apache.samza.util.ScalaJavaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Function0; +import scala.Function1; +import scala.Function2; +import scala.collection.JavaConverters; +import scala.runtime.AbstractFunction0; +import scala.runtime.AbstractFunction1; +import scala.runtime.AbstractFunction2; +import scala.runtime.BoxedUnit; + +import static org.apache.samza.config.KafkaConsumerConfig.*; + + +public class KafkaSystemAdmin implements ExtendedSystemAdmin { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemAdmin.class); + + // Default exponential sleep strategy values + protected static final double DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER = 2.0; + protected static final long DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS = 500; + protected static final long DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS = 10000; + protected static final int MAX_RETRIES_ON_EXCEPTION = 5; + protected static final int DEFAULT_REPL_FACTOR = 2; + + // used in TestRepartitionJoinWindowApp TODO - remove SAMZA-1945 + @VisibleForTesting + public static volatile boolean deleteMessageCalled = false; + + protected final String systemName; + protected final Consumer metadataConsumer; + + // get ZkUtils object to connect to Kafka's ZK. + private final Supplier<ZkUtils> getZkConnection; + + // Custom properties to create a new coordinator stream. + private final Properties coordinatorStreamProperties; + + // Replication factor for a new coordinator stream. + private final int coordinatorStreamReplicationFactor; + + // Replication factor and kafka properties for changelog topic creation + private final Map<String, ChangelogInfo> changelogTopicMetaInformation; + + // Kafka properties for intermediate topics creation + private final Map<String, Properties> intermediateStreamProperties; + + // adminClient is required for deleteCommittedMessages operation + private final AdminClient adminClient; + + // used for intermediate streams + private final boolean deleteCommittedMessages; + + private final AtomicBoolean stopped = new AtomicBoolean(false); + + public KafkaSystemAdmin(String systemName, Config config, Consumer metadataConsumer) { + this.systemName = systemName; + + if (metadataConsumer == null) { + throw new SamzaException( + "Cannot construct KafkaSystemAdmin for system " + systemName + " with null metadataConsumer"); + } + this.metadataConsumer = metadataConsumer; + + // populate brokerList from either consumer or producer configs + Properties props = new Properties(); + String brokerList = config.get( + String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + if (brokerList == null) { + brokerList = config.get(String.format(KafkaConfig.PRODUCER_CONFIGS_CONFIG_KEY(), systemName, + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + } + if (brokerList == null) { + throw new SamzaException( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " is required for systemAdmin for system " + systemName); + } + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); + + // kafka.admin.AdminUtils requires zkConnect + // this will change after we move to the new org.apache..AdminClient + String zkConnect = + config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ZOOKEEPER_CONNECT)); + if (StringUtils.isBlank(zkConnect)) { + throw new SamzaException("Missing zookeeper.connect config for admin for system " + systemName); + } + props.put(ZOOKEEPER_CONNECT, zkConnect); + + adminClient = AdminClient.create(props); + + getZkConnection = () -> { + return ZkUtils.apply(zkConnect, 6000, 6000, false); + }; + + KafkaConfig kafkaConfig = new KafkaConfig(config); + coordinatorStreamReplicationFactor = Integer.valueOf(kafkaConfig.getCoordinatorReplicationFactor()); + coordinatorStreamProperties = KafkaSystemAdminUtilsScala.getCoordinatorTopicProperties(kafkaConfig); + + Map<String, String> storeToChangelog = + JavaConverters.mapAsJavaMapConverter(kafkaConfig.getKafkaChangelogEnabledStores()).asJava(); + // Construct the meta information for each topic, if the replication factor is not defined, + // we use 2 (DEFAULT_REPL_FACTOR) as the number of replicas for the change log stream. + changelogTopicMetaInformation = new HashMap<>(); + for (Map.Entry<String, String> e : storeToChangelog.entrySet()) { + String storeName = e.getKey(); + String topicName = e.getValue(); + String replicationFactorStr = kafkaConfig.getChangelogStreamReplicationFactor(storeName); + int replicationFactor = + StringUtils.isEmpty(replicationFactorStr) ? DEFAULT_REPL_FACTOR : Integer.valueOf(replicationFactorStr); + ChangelogInfo changelogInfo = + new ChangelogInfo(replicationFactor, kafkaConfig.getChangelogKafkaProperties(storeName)); + LOG.info(String.format("Creating topic meta information for topic: %s with replication factor: %s", topicName, + replicationFactor)); + changelogTopicMetaInformation.put(topicName, changelogInfo); + } + + // special flag to allow/enforce deleting of committed messages + SystemConfig systemConfig = new SystemConfig(config); + this.deleteCommittedMessages = systemConfig.deleteCommittedMessages(systemName); + + intermediateStreamProperties = + JavaConverters.mapAsJavaMapConverter(KafkaSystemAdminUtilsScala.getIntermediateStreamProperties(config)) + .asJava(); + + LOG.info(String.format("Created KafkaSystemAdmin for system %s", systemName)); + } + + @Override + public void start() { + // Plese note. There is slight inconsistency in the use of this class. + // Some of the functionality of this class may actually be used BEFORE start() is called. + // The SamzaContainer gets metadata (using this class) in SamzaContainer.apply, + // but this "start" actually gets called in SamzaContainer.run. + // review this usage (SAMZA-1888) + + // Throw exception if start is called after stop + if (stopped.get()) { + throw new IllegalStateException("SamzaKafkaAdmin.start() is called after stop()"); + } + } + + @Override + public void stop() { + if (stopped.compareAndSet(false, true)) { + try { + metadataConsumer.close(); + } catch (Exception e) { + LOG.warn("metadataConsumer.close for system " + systemName + " failed with exception.", e); + } + try { + adminClient.close(); + } catch (Exception e) { + LOG.warn("adminClient.close for system " + systemName + " failed with exception.", e); + } + } + } + + /** + * Note! This method does not populate SystemStreamMetadata for each stream with real data. + * Thus, this method should ONLY be used to get number of partitions for each stream. + * It will throw NotImplementedException if anyone tries to access the actual metadata. + * @param streamNames set of streams for which get the partitions counts + * @param cacheTTL cache TTL if caching the data + * @return a map, keyed on stream names. Number of partitions in SystemStreamMetadata is the output of this method. + */ + @Override + public Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) { + // This optimization omits actual metadata for performance. Instead, we inject a dummy for all partitions. + final SystemStreamMetadata.SystemStreamPartitionMetadata dummySspm = + new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null) { + String msg = + "getSystemStreamPartitionCounts does not populate SystemStreaMetadata info. Only number of partitions"; + + @Override + public String getOldestOffset() { + throw new NotImplementedException(msg); + } + + @Override + public String getNewestOffset() { + throw new NotImplementedException(msg); + } + + @Override + public String getUpcomingOffset() { + throw new NotImplementedException(msg); + } + }; + + ExponentialSleepStrategy strategy = new ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER, + DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS); + + Function1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>> fetchMetadataOperation = + new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>>() { + @Override + public Map<String, SystemStreamMetadata> apply(ExponentialSleepStrategy.RetryLoop loop) { + Map<String, SystemStreamMetadata> allMetadata = new HashMap<>(); + + streamNames.forEach(streamName -> { + Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>(); + + List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(streamName); + LOG.debug("Stream {} has partitions {}", streamName, partitionInfos); + + partitionInfos.forEach(partitionInfo -> { + partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm); + }); + + allMetadata.put(streamName, new SystemStreamMetadata(streamName, partitionMetadata)); + }); + + loop.done(); + return allMetadata; + } + }; + + Map<String, SystemStreamMetadata> result = strategy.run(fetchMetadataOperation, + new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() { + @Override + public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) { + if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) { + LOG.warn(String.format("Fetching systemstreampartition counts for: %s threw an exception. Retrying.", + streamNames), exception); + } else { + LOG.error(String.format("Fetching systemstreampartition counts for: %s threw an exception.", streamNames), + exception); + loop.done(); + throw new SamzaException(exception); + } + return null; + } + }).get(); + + LOG.info("SystemStream partition counts for system {}: {}", systemName, result); + return result; + } + + @Override + public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) { + // This is safe to do with Kafka, even if a topic is key-deduped. If the + // offset doesn't exist on a compacted topic, Kafka will return the first + // message AFTER the offset that was specified in the fetch request. + return offsets.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, (entry) -> String.valueOf(Long.valueOf(entry.getValue()) + 1))); + } + + @Override + public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) { + return getSystemStreamMetadata(streamNames, + new ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER, + DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS)); + } + + @Override + public Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> getSSPMetadata( + Set<SystemStreamPartition> ssps) { + + LOG.info("Fetching SSP metadata for: {}", ssps); + List<TopicPartition> topicPartitions = ssps.stream() + .map(ssp -> new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId())) + .collect(Collectors.toList()); + + OffsetsMaps topicPartitionsMetadata = fetchTopicPartitionsMetadata(topicPartitions); + + Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> sspToSSPMetadata = new HashMap<>(); + for (SystemStreamPartition ssp : ssps) { + String oldestOffset = topicPartitionsMetadata.getOldestOffsets().get(ssp); + String newestOffset = topicPartitionsMetadata.getNewestOffsets().get(ssp); + String upcomingOffset = topicPartitionsMetadata.getUpcomingOffsets().get(ssp); + + sspToSSPMetadata.put(ssp, + new SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, newestOffset, upcomingOffset)); + } + return sspToSSPMetadata; + } + + /** + * Given a set of stream names (topics), fetch metadata from Kafka for each + * stream, and return a map from stream name to SystemStreamMetadata for + * each stream. This method will return null for oldest and newest offsets + * if a given SystemStreamPartition is empty. This method will block and + * retry indefinitely until it gets a successful response from Kafka. + * + * @param streamNames a set of strings of stream names/topics + * @param retryBackoff retry backoff strategy + * @return a map from topic to SystemStreamMetadata which has offsets for each partition + */ + public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames, + ExponentialSleepStrategy retryBackoff) { + + LOG.info("Fetching system stream metadata for {} from system {}", streamNames, systemName); + + Function1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>> fetchMetadataOperation = + new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, Map<String, SystemStreamMetadata>>() { + @Override + public Map<String, SystemStreamMetadata> apply(ExponentialSleepStrategy.RetryLoop loop) { + Map<String, SystemStreamMetadata> metadata = fetchSystemStreamMetadata(streamNames); + loop.done(); + return metadata; + } + }; + + Function2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit> onExceptionRetryOperation = + new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() { + @Override + public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) { + if (loop.sleepCount() < MAX_RETRIES_ON_EXCEPTION) { + LOG.warn( + String.format("Fetching system stream metadata for: %s threw an exception. Retrying.", streamNames), + exception); + } else { + LOG.error(String.format("Fetching system stream metadata for: %s threw an exception.", streamNames), + exception); + loop.done(); + throw new SamzaException(exception); + } + + return null; + } + }; + + Function0<Map<String, SystemStreamMetadata>> fallbackOperation = + new AbstractFunction0<Map<String, SystemStreamMetadata>>() { + @Override + public Map<String, SystemStreamMetadata> apply() { + throw new SamzaException("Failed to get system stream metadata"); + } + }; + + Map<String, SystemStreamMetadata> result = + retryBackoff.run(fetchMetadataOperation, onExceptionRetryOperation).getOrElse(fallbackOperation); + return result; + } + + @Override + public String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries) { + LOG.info("Fetching newest offset for: {}", ssp); + + ExponentialSleepStrategy strategy = new ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER, + DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS); + + Function1<ExponentialSleepStrategy.RetryLoop, String> fetchNewestOffset = + new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, String>() { + @Override + public String apply(ExponentialSleepStrategy.RetryLoop loop) { + String result = fetchNewestOffset(ssp); + loop.done(); + return result; + } + }; + + String offset = strategy.run(fetchNewestOffset, + new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() { + @Override + public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) { + if (loop.sleepCount() < maxRetries) { + LOG.warn(String.format("Fetching newest offset for: %s threw an exception. Retrying.", ssp), exception); + } else { + LOG.error(String.format("Fetching newest offset for: %s threw an exception.", ssp), exception); + loop.done(); + throw new SamzaException("Exception while trying to get newest offset", exception); + } + return null; + } + }).get(); + + return offset; + } + + /** + * Convert TopicPartition to SystemStreamPartition + * @param topicPartition the topic partition to be created + * @return an instance of SystemStreamPartition + */ + private SystemStreamPartition toSystemStreamPartition(TopicPartition topicPartition) { + String topic = topicPartition.topic(); + Partition partition = new Partition(topicPartition.partition()); + return new SystemStreamPartition(systemName, topic, partition); + } + + /** + * Uses {@code metadataConsumer} to fetch the metadata for the {@code topicPartitions}. + * Warning: If multiple threads call this with the same {@code metadataConsumer}, then this will not protect against + * concurrent access to the {@code metadataConsumer}. + */ + private OffsetsMaps fetchTopicPartitionsMetadata(List<TopicPartition> topicPartitions) { + Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>(); + Map<SystemStreamPartition, String> newestOffsets = new HashMap<>(); + Map<SystemStreamPartition, String> upcomingOffsets = new HashMap<>(); + + Map<TopicPartition, Long> oldestOffsetsWithLong = metadataConsumer.beginningOffsets(topicPartitions); + LOG.debug("Kafka-fetched beginningOffsets: {}", oldestOffsetsWithLong); + Map<TopicPartition, Long> upcomingOffsetsWithLong = metadataConsumer.endOffsets(topicPartitions); + LOG.debug("Kafka-fetched endOffsets: {}", upcomingOffsetsWithLong); + + oldestOffsetsWithLong.forEach((topicPartition, offset) -> { + oldestOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset)); + }); + + upcomingOffsetsWithLong.forEach((topicPartition, offset) -> { + upcomingOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset)); + + // Kafka's beginning Offset corresponds to the offset for the oldest message. + // Kafka's end offset corresponds to the offset for the upcoming message, and it is the newest offset + 1. + // When upcoming offset is <=0, the topic appears empty, we put oldest offset 0 and the newest offset null. + // When upcoming offset is >0, we subtract the upcoming offset by one for the newest offset. + // For normal case, the newest offset will correspond to the offset of the newest message in the stream; + // But for the big message, it is not the case. Seeking on the newest offset gives nothing for the newest big message. + // For now, we keep it as is for newest offsets the same as historical metadata structure. + if (offset <= 0) { + LOG.warn( + "Empty Kafka topic partition {} with upcoming offset {}. Skipping newest offset and setting oldest offset to 0 to consume from beginning", + topicPartition, offset); + oldestOffsets.put(toSystemStreamPartition(topicPartition), "0"); + } else { + newestOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset - 1)); + } + }); + return new OffsetsMaps(oldestOffsets, newestOffsets, upcomingOffsets); + } + + /** + * Fetch SystemStreamMetadata for each topic with the consumer + * @param topics set of topics to get metadata info for + * @return map of topic to SystemStreamMetadata + */ + private Map<String, SystemStreamMetadata> fetchSystemStreamMetadata(Set<String> topics) { + Map<SystemStreamPartition, String> allOldestOffsets = new HashMap<>(); + Map<SystemStreamPartition, String> allNewestOffsets = new HashMap<>(); + Map<SystemStreamPartition, String> allUpcomingOffsets = new HashMap<>(); + + LOG.info("Fetching SystemStreamMetadata for topics {} on system {}", topics, systemName); + + topics.forEach(topic -> { + List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic); + + if (partitionInfos == null) { + String msg = String.format("Partition info not(yet?) available for system %s topic %s", systemName, topic); + throw new SamzaException(msg); + } + + List<TopicPartition> topicPartitions = partitionInfos.stream() + .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) + .collect(Collectors.toList()); + + OffsetsMaps offsetsForTopic = fetchTopicPartitionsMetadata(topicPartitions); + allOldestOffsets.putAll(offsetsForTopic.getOldestOffsets()); + allNewestOffsets.putAll(offsetsForTopic.getNewestOffsets()); + allUpcomingOffsets.putAll(offsetsForTopic.getUpcomingOffsets()); + }); + + scala.collection.immutable.Map<String, SystemStreamMetadata> result = + KafkaSystemAdminUtilsScala.assembleMetadata(ScalaJavaUtil.toScalaMap(allOldestOffsets), + ScalaJavaUtil.toScalaMap(allNewestOffsets), ScalaJavaUtil.toScalaMap(allUpcomingOffsets)); + + LOG.debug("assembled SystemStreamMetadata is: {}", result); + return JavaConverters.mapAsJavaMapConverter(result).asJava(); + } + + private String fetchNewestOffset(SystemStreamPartition ssp) { + LOG.debug("Fetching newest offset for {}", ssp); + String newestOffset; + + TopicPartition topicPartition = new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId()); + + // the offsets returned from the consumer is the Long type + Long upcomingOffset = + (Long) metadataConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition); + + // Kafka's "latest" offset is always last message in stream's offset + 1, + // so get newest message in stream by subtracting one. This is safe + // even for key-deduplicated streams, since the last message will + // never be deduplicated. + if (upcomingOffset <= 0) { + LOG.debug("Stripping newest offsets for {} because the topic appears empty.", topicPartition); + newestOffset = null; + } else { + newestOffset = String.valueOf(upcomingOffset - 1); + } + + LOG.info("Newest offset for ssp {} is: {}", ssp, newestOffset); + return newestOffset; + } + + @Override + public Integer offsetComparator(String offset1, String offset2) { + if (offset1 == null || offset2 == null) { + return -1; + } + + return Long.valueOf(offset1).compareTo(Long.valueOf(offset2)); + } + + @Override + public boolean createStream(StreamSpec streamSpec) { + LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName()); + + return KafkaSystemAdminUtilsScala.createStream(toKafkaSpec(streamSpec), getZkConnection); + } + + @Override + public boolean clearStream(StreamSpec streamSpec) { + LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName()); + + KafkaSystemAdminUtilsScala.clearStream(streamSpec, getZkConnection); + + Map<String, List<PartitionInfo>> topicsMetadata = getTopicMetadata(ImmutableSet.of(streamSpec.getPhysicalName())); + return topicsMetadata.get(streamSpec.getPhysicalName()).isEmpty(); + } + + /** + * Converts a StreamSpec into a KafakStreamSpec. Special handling for coordinator and changelog stream. + * @param spec a StreamSpec object + * @return KafkaStreamSpec object + */ + KafkaStreamSpec toKafkaSpec(StreamSpec spec) { + KafkaStreamSpec kafkaSpec; + if (spec.isChangeLogStream()) { + String topicName = spec.getPhysicalName(); + ChangelogInfo topicMeta = changelogTopicMetaInformation.get(topicName); + if (topicMeta == null) { + throw new StreamValidationException("Unable to find topic information for topic " + topicName); + } + + kafkaSpec = new KafkaStreamSpec(spec.getId(), topicName, systemName, spec.getPartitionCount(), + topicMeta.replicationFactor(), topicMeta.kafkaProps()); + } else if (spec.isCoordinatorStream()) { + kafkaSpec = + new KafkaStreamSpec(spec.getId(), spec.getPhysicalName(), systemName, 1, coordinatorStreamReplicationFactor, + coordinatorStreamProperties); + } else if (intermediateStreamProperties.containsKey(spec.getId())) { + kafkaSpec = KafkaStreamSpec.fromSpec(spec).copyWithProperties(intermediateStreamProperties.get(spec.getId())); + } else { + kafkaSpec = KafkaStreamSpec.fromSpec(spec); + } + return kafkaSpec; + } + + @Override + public void validateStream(StreamSpec streamSpec) throws StreamValidationException { + LOG.info("About to validate stream = " + streamSpec); + + String streamName = streamSpec.getPhysicalName(); + SystemStreamMetadata systemStreamMetadata = + getSystemStreamMetadata(Collections.singleton(streamName)).get(streamName); + if (systemStreamMetadata == null) { + throw new StreamValidationException( + "Failed to obtain metadata for stream " + streamName + ". Validation failed."); + } + + int actualPartitionCounter = systemStreamMetadata.getSystemStreamPartitionMetadata().size(); + int expectedPartitionCounter = streamSpec.getPartitionCount(); + LOG.info("actualCount=" + actualPartitionCounter + "; expectedCount=" + expectedPartitionCounter); + if (actualPartitionCounter != expectedPartitionCounter) { + throw new StreamValidationException( + String.format("Mismatch of partitions for stream %s. Expected %d, got %d. Validation failed.", streamName, + expectedPartitionCounter, actualPartitionCounter)); + } + } + + // get partition info for topic + Map<String, List<PartitionInfo>> getTopicMetadata(Set<String> topics) { + Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap(); + List<PartitionInfo> partitionInfoList; + for (String topic : topics) { + partitionInfoList = metadataConsumer.partitionsFor(topic); + streamToPartitionsInfo.put(topic, partitionInfoList); + } + + return streamToPartitionsInfo; + } + + /** + * Delete records up to (and including) the provided ssp offsets for + * all system stream partitions specified in the map. + * This only works with Kafka cluster 0.11 or later. Otherwise it's a no-op. + * @param offsets specifies up to what offsets the messages should be deleted + */ + @Override + public void deleteMessages(Map<SystemStreamPartition, String> offsets) { + if (deleteCommittedMessages) { + KafkaSystemAdminUtilsScala.deleteMessages(adminClient, offsets); + deleteMessageCalled = true; + } + } + + /** + * Container for metadata about offsets. + */ + private static class OffsetsMaps { + private final Map<SystemStreamPartition, String> oldestOffsets; + private final Map<SystemStreamPartition, String> newestOffsets; + private final Map<SystemStreamPartition, String> upcomingOffsets; + + private OffsetsMaps(Map<SystemStreamPartition, String> oldestOffsets, + Map<SystemStreamPartition, String> newestOffsets, Map<SystemStreamPartition, String> upcomingOffsets) { + this.oldestOffsets = oldestOffsets; + this.newestOffsets = newestOffsets; + this.upcomingOffsets = upcomingOffsets; + } + + private Map<SystemStreamPartition, String> getOldestOffsets() { + return oldestOffsets; + } + + private Map<SystemStreamPartition, String> getNewestOffsets() { + return newestOffsets; + } + + private Map<SystemStreamPartition, String> getUpcomingOffsets() { + return upcomingOffsets; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java new file mode 100644 index 0000000..65d0e42 --- /dev/null +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java @@ -0,0 +1,366 @@ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.system.kafka; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import kafka.common.TopicAndPartition; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.BlockingEnvelopeMap; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + + +public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements SystemConsumer { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemConsumer.class); + + private static final long FETCH_THRESHOLD = 50000; + private static final long FETCH_THRESHOLD_BYTES = -1L; + + protected final Consumer<K, V> kafkaConsumer; + protected final String systemName; + protected final String clientId; + private final AtomicBoolean stopped = new AtomicBoolean(false); + private final AtomicBoolean started = new AtomicBoolean(false); + private final Config config; + private final boolean fetchThresholdBytesEnabled; + private final KafkaSystemConsumerMetrics metrics; + + // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap. + final KafkaConsumerMessageSink messageSink; + + // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates + // BlockingEnvelopMap's buffers. + final private KafkaConsumerProxy proxy; + + // keep registration data until the start - mapping between registered SSPs and topicPartitions, and their offsets + final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>(); + final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>(); + + long perPartitionFetchThreshold; + long perPartitionFetchThresholdBytes; + + /** + * Create a KafkaSystemConsumer for the provided {@code systemName} + * @param systemName system name for which we create the consumer + * @param config application config + * @param metrics metrics for this KafkaSystemConsumer + * @param clock system clock + */ + public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId, + KafkaSystemConsumerMetrics metrics, Clock clock) { + + super(metrics.registry(), clock, metrics.getClass().getName()); + + this.kafkaConsumer = kafkaConsumer; + this.clientId = clientId; + this.systemName = systemName; + this.config = config; + this.metrics = metrics; + + fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName); + + // create a sink for passing the messages between the proxy and the consumer + messageSink = new KafkaConsumerMessageSink(); + + // Create the proxy to do the actual message reading. + String metricName = String.format("%s-%s", systemName, clientId); + proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, metrics, metricName); + LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy); + } + + /** + * Create internal kafka consumer object, which will be used in the Proxy. + * @param systemName system name for which we create the consumer + * @param kafkaConsumerConfig config object for Kafka's KafkaConsumer + * @return KafkaConsumer object + */ + public static <K,V> KafkaConsumer<K, V> createKafkaConsumerImpl(String systemName, + HashMap<String, Object> kafkaConsumerConfig) { + + LOG.info("Instantiating KafkaConsumer for systemName {} with properties {}", systemName, kafkaConsumerConfig); + return new KafkaConsumer<>(kafkaConsumerConfig); + } + + @Override + public void start() { + if (!started.compareAndSet(false, true)) { + LOG.warn("{}: Attempting to start the consumer for the second (or more) time.", this); + return; + } + if (stopped.get()) { + LOG.error("{}: Attempting to start a stopped consumer", this); + return; + } + // initialize the subscriptions for all the registered TopicPartitions + startSubscription(); + // needs to be called after all the registrations are completed + setFetchThresholds(); + + startConsumer(); + LOG.info("{}: Consumer started", this); + } + + private void startSubscription() { + //subscribe to all the registered TopicPartitions + LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToSSP.keySet()); + try { + synchronized (kafkaConsumer) { + // we are using assign (and not subscribe), so we need to specify both topic and partition + kafkaConsumer.assign(topicPartitionsToSSP.keySet()); + } + } catch (Exception e) { + throw new SamzaException("Consumer subscription failed for " + this, e); + } + } + + /** + * Set the offsets to start from. + * Register the TopicPartitions with the proxy. + * Start the proxy. + */ + void startConsumer() { + // set the offset for each TopicPartition + if (topicPartitionsToOffset.size() <= 0) { + LOG.error ("{}: Consumer is not subscribed to any SSPs", this); + } + + topicPartitionsToOffset.forEach((tp, startingOffsetString) -> { + long startingOffset = Long.valueOf(startingOffsetString); + + try { + synchronized (kafkaConsumer) { + kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value + } + } catch (Exception e) { + // all recoverable execptions are handled by the client. + // if we get here there is nothing left to do but bail out. + String msg = + String.format("%s: Got Exception while seeking to %s for partition %s", this, startingOffsetString, tp); + LOG.error(msg, e); + throw new SamzaException(msg, e); + } + + LOG.info("{}: Changing consumer's starting offset for tp = %s to %s", this, tp, startingOffsetString); + + // add the partition to the proxy + proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset); + }); + + // start the proxy thread + if (proxy != null && !proxy.isRunning()) { + LOG.info("{}: Starting proxy {}", this, proxy); + proxy.start(); + } + } + + private void setFetchThresholds() { + // get the thresholds, and set defaults if not defined. + KafkaConfig kafkaConfig = new KafkaConfig(config); + + Option<String> fetchThresholdOption = kafkaConfig.getConsumerFetchThreshold(systemName); + long fetchThreshold = FETCH_THRESHOLD; + if (fetchThresholdOption.isDefined()) { + fetchThreshold = Long.valueOf(fetchThresholdOption.get()); + } + + Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName); + long fetchThresholdBytes = FETCH_THRESHOLD_BYTES; + if (fetchThresholdBytesOption.isDefined()) { + fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get()); + } + + int numPartitions = topicPartitionsToSSP.size(); + if (numPartitions != topicPartitionsToOffset.size()) { + throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()"); + } + + + if (numPartitions > 0) { + perPartitionFetchThreshold = fetchThreshold / numPartitions; + if (fetchThresholdBytesEnabled) { + // currently this feature cannot be enabled, because we do not have the size of the messages available. + // messages get double buffered, hence divide by 2 + perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numPartitions; + } + } + LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; numPartitions={}, perPartitionFetchThreshold={}, perPartitionFetchThresholdBytes(0 if disabled)={}", + this, fetchThresholdBytes, fetchThreshold, numPartitions, perPartitionFetchThreshold, perPartitionFetchThresholdBytes); + } + + @Override + public void stop() { + if (!stopped.compareAndSet(false, true)) { + LOG.warn("{}: Attempting to stop stopped consumer.", this); + return; + } + + LOG.info("{}: Stopping Samza kafkaConsumer ", this); + + // stop the proxy (with 1 minute timeout) + if (proxy != null) { + LOG.info("{}: Stopping proxy {}", this, proxy); + proxy.stop(TimeUnit.SECONDS.toMillis(60)); + } + + try { + synchronized (kafkaConsumer) { + LOG.info("{}: Closing kafkaSystemConsumer {}", this, kafkaConsumer); + kafkaConsumer.close(); + } + } catch (Exception e) { + LOG.warn("{}: Failed to stop KafkaSystemConsumer.", this, e); + } + } + + /** + * record the ssp and the offset. Do not submit it to the consumer yet. + * @param systemStreamPartition ssp to register + * @param offset offset to register with + */ + @Override + public void register(SystemStreamPartition systemStreamPartition, String offset) { + if (started.get()) { + String msg = String.format("%s: Trying to register partition after consumer has been started. ssp=%s", this, + systemStreamPartition); + throw new SamzaException(msg); + } + + if (!systemStreamPartition.getSystem().equals(systemName)) { + LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition); + return; + } + LOG.info("{}: Registering ssp = {} with offset {}", this, systemStreamPartition, offset); + + super.register(systemStreamPartition, offset); + + TopicPartition tp = toTopicPartition(systemStreamPartition); + + topicPartitionsToSSP.put(tp, systemStreamPartition); + + String existingOffset = topicPartitionsToOffset.get(tp); + // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages. + if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) { + topicPartitionsToOffset.put(tp, offset); + } + + metrics.registerTopicAndPartition(toTopicAndPartition(tp)); + } + + /** + * Compare two String offsets. + * Note. There is a method in KafkaSystemAdmin that does that, but that would require instantiation of systemadmin for each consumer. + * @return see {@link Long#compareTo(Long)} + */ + private static int compareOffsets(String offset1, String offset2) { + return Long.valueOf(offset1).compareTo(Long.valueOf(offset2)); + } + + @Override + public String toString() { + return String.format("%s:%s", systemName, clientId); + } + + @Override + public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll( + Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { + + // check if the proxy is running + if (!proxy.isRunning()) { + stop(); + String message = String.format("%s: KafkaConsumerProxy has stopped.", this); + throw new SamzaException(message, proxy.getFailureCause()); + } + + return super.poll(systemStreamPartitions, timeout); + } + + /** + * convert from TopicPartition to TopicAndPartition + */ + public static TopicAndPartition toTopicAndPartition(TopicPartition tp) { + return new TopicAndPartition(tp.topic(), tp.partition()); + } + + /** + * convert to TopicPartition from SystemStreamPartition + */ + public static TopicPartition toTopicPartition(SystemStreamPartition ssp) { + return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId()); + } + + /** + * return system name for this consumer + * @return system name + */ + public String getSystemName() { + return systemName; + } + + public class KafkaConsumerMessageSink { + + public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) { + setIsAtHead(ssp, isAtHighWatermark); + } + + boolean needsMoreMessages(SystemStreamPartition ssp) { + LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};" + + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, fetchThresholdBytesEnabled, + getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp), + perPartitionFetchThreshold); + + if (fetchThresholdBytesEnabled) { + return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes; + } else { + return getNumMessagesInQueue(ssp) < perPartitionFetchThreshold; + } + } + + void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) { + LOG.trace("{}: Incoming message ssp = {}: envelope = {}.", this, ssp, envelope); + + try { + put(ssp, envelope); + } catch (InterruptedException e) { + throw new SamzaException( + String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s", this, + envelope.getOffset(), ssp)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index e5cca36..f492518 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -62,6 +62,11 @@ object KafkaConfig { val JOB_COORDINATOR_REPLICATION_FACTOR = "job.coordinator." + TOPIC_REPLICATION_FACTOR val JOB_COORDINATOR_SEGMENT_BYTES = "job.coordinator." + SEGMENT_BYTES + val CONSUMER_CONFIGS_CONFIG_KEY = "systems.%s.consumer.%s" + val PRODUCER_BOOTSTRAP_SERVERS_CONFIG_KEY = "systems.%s.producer.bootstrap.servers" + val PRODUCER_CONFIGS_CONFIG_KEY = "systems.%s.producer.%s" + val CONSUMER_ZK_CONNECT_CONFIG_KEY = "systems.%s.consumer.zookeeper.connect" + /** * Defines how low a queue can get for a single system/stream/partition * combination before trying to fetch more messages for it. http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java deleted file mode 100644 index 6cebc28..0000000 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.samza.config; - -import java.util.HashMap; -import java.util.Map; -import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.RangeAssignor; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.samza.SamzaException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Option; - - -/** - * The configuration class for KafkaConsumer - */ -public class KafkaConsumerConfig extends HashMap<String, Object> { - - public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class); - - static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer"; - static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer"; - static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin"; - - /* - * By default, KafkaConsumer will fetch some big number of available messages for all the partitions. - * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll(). - */ - static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100"; - - private KafkaConsumerConfig(Map<String, Object> map) { - super(map); - } - - /** - * Helper method to create configs for use in Kafka consumer. - * The values are based on the "consumer" subset of the configs provided by the app and Samza overrides. - * - * @param config config provided by the app. - * @param systemName system name to get the consumer configuration for. - * @param clientId client id to be used in the Kafka consumer. - * @return KafkaConsumerConfig - */ - public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId) { - - Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true); - - //Kafka client configuration - String groupId = getConsumerGroupId(config); - - Map<String, Object> consumerProps = new HashMap<>(); - consumerProps.putAll(subConf); - - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); - - // These are values we enforce in sazma, and they cannot be overwritten. - - // Disable consumer auto-commit because Samza controls commits - consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - - // Translate samza config value to kafka config value - consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))); - - // if consumer bootstrap servers are not configured, get them from the producer configs - if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { - String bootstrapServers = - config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - if (StringUtils.isEmpty(bootstrapServers)) { - throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config for " + systemName); - } - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - } - - // Always use default partition assignment strategy. Do not allow override. - consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); - - // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should - // default to byte[] - if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { - LOG.info("setting key serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - } - if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { - LOG.info("setting value serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - } - - // Override default max poll config if there is no value - consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, - (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS); - - return new KafkaConsumerConfig(consumerProps); - } - - // group id should be unique per job - static String getConsumerGroupId(Config config) { - JobConfig jobConfig = new JobConfig(config); - Option jobNameOption = jobConfig.getName(); - if (jobNameOption.isEmpty()) { - throw new ConfigException("Missing job name"); - } - String jobName = (String) jobNameOption.get(); - - String jobId = jobConfig.getJobId(); - - return String.format("%s-%s", jobName, jobId); - } - - // client id should be unique per job - public static String getConsumerClientId(Config config) { - return getConsumerClientId(CONSUMER_CLIENT_ID_PREFIX, config); - } - - public static String getProducerClientId(Config config) { - return getConsumerClientId(PRODUCER_CLIENT_ID_PREFIX, config); - } - - public static String getAdminClientId(Config config) { - return getConsumerClientId(ADMIN_CLIENT_ID_PREFIX, config); - } - - static String getConsumerClientId(String id, Config config) { - JobConfig jobConfig = new JobConfig(config); - Option jobNameOption = jobConfig.getName(); - if (jobNameOption.isEmpty()) { - throw new ConfigException("Missing job name"); - } - String jobName = (String) jobNameOption.get(); - - String jobId = jobConfig.getJobId(); - - return String.format("%s-%s-%s", id.replaceAll("\\W", "_"), jobName.replaceAll("\\W", "_"), - jobId.replaceAll("\\W", "_")); - } - - /** - * If settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset), - * then need to convert them (see kafka.apache.org/documentation): - * "largest" -> "latest" - * "smallest" -> "earliest" - * - * If no setting specified we return "latest" (same as Kafka). - * @param autoOffsetReset value from the app provided config - * @return String representing the config value for "auto.offset.reset" property - */ - static String getAutoOffsetResetValue(final String autoOffsetReset) { - final String SAMZA_OFFSET_LARGEST = "largest"; - final String SAMZA_OFFSET_SMALLEST = "smallest"; - final String KAFKA_OFFSET_LATEST = "latest"; - final String KAFKA_OFFSET_EARLIEST = "earliest"; - final String KAFKA_OFFSET_NONE = "none"; - - if (autoOffsetReset == null) { - return KAFKA_OFFSET_LATEST; // return default - } - - // accept kafka values directly - if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST) - || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) { - return autoOffsetReset; - } - - String newAutoOffsetReset; - switch (autoOffsetReset) { - case SAMZA_OFFSET_LARGEST: - newAutoOffsetReset = KAFKA_OFFSET_LATEST; - break; - case SAMZA_OFFSET_SMALLEST: - newAutoOffsetReset = KAFKA_OFFSET_EARLIEST; - break; - default: - newAutoOffsetReset = KAFKA_OFFSET_LATEST; - } - LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset); - return newAutoOffsetReset; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index 04071c1..e47add7 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -90,6 +90,8 @@ class KafkaConsumerProxy<K, V> { consumerPollThread.setDaemon(true); consumerPollThread.setName( "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName); + + LOG.info("Creating KafkaConsumerProxy with systeName={}, clientId={}, metricsName={}", systemName, clientId, metricName); } /**
