http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala deleted file mode 100644 index 6ab4d32..0000000 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ /dev/null @@ -1,608 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.system.kafka - -import java.util -import java.util.{Properties, UUID} - -import com.google.common.annotations.VisibleForTesting -import kafka.admin.{AdminClient, AdminUtils} -import kafka.api._ -import kafka.common.TopicAndPartition -import kafka.consumer.{ConsumerConfig, SimpleConsumer} -import kafka.utils.ZkUtils -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.common.errors.TopicExistsException -import org.apache.kafka.common.TopicPartition -import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata -import org.apache.samza.system._ -import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, Logging} -import org.apache.samza.{Partition, SamzaException} - -import scala.collection.JavaConverters._ - - -object KafkaSystemAdmin extends Logging { - - @VisibleForTesting @volatile var deleteMessagesCalled = false - val CLEAR_STREAM_RETRIES = 3 - - /** - * A helper method that takes oldest, newest, and upcoming offsets for each - * system stream partition, and creates a single map from stream name to - * SystemStreamMetadata. - */ - def assembleMetadata(oldestOffsets: Map[SystemStreamPartition, String], newestOffsets: Map[SystemStreamPartition, String], upcomingOffsets: Map[SystemStreamPartition, String]): Map[String, SystemStreamMetadata] = { - val allMetadata = (oldestOffsets.keySet ++ newestOffsets.keySet ++ upcomingOffsets.keySet) - .groupBy(_.getStream) - .map { - case (streamName, systemStreamPartitions) => - val streamPartitionMetadata = systemStreamPartitions - .map(systemStreamPartition => { - val partitionMetadata = new SystemStreamPartitionMetadata( - // If the topic/partition is empty then oldest and newest will - // be stripped of their offsets, so default to null. - oldestOffsets.getOrElse(systemStreamPartition, null), - newestOffsets.getOrElse(systemStreamPartition, null), - upcomingOffsets(systemStreamPartition)) - (systemStreamPartition.getPartition, partitionMetadata) - }) - .toMap - val streamMetadata = new SystemStreamMetadata(streamName, streamPartitionMetadata.asJava) - (streamName, streamMetadata) - } - .toMap - - // This is typically printed downstream and it can be spammy, so debug level here. - debug("Got metadata: %s" format allMetadata) - - allMetadata - } -} - -/** - * A helper class that is used to construct the changelog stream specific information - * - * @param replicationFactor The number of replicas for the changelog stream - * @param kafkaProps The kafka specific properties that need to be used for changelog stream creation - */ -case class ChangelogInfo(var replicationFactor: Int, var kafkaProps: Properties) - -/** - * A Kafka-based implementation of SystemAdmin. - */ -class KafkaSystemAdmin( - /** - * The system name to use when creating SystemStreamPartitions to return in - * the getSystemStreamMetadata responser. - */ - systemName: String, - - // TODO whenever Kafka decides to make the Set[Broker] class public, let's switch to Set[Broker] here. - /** - * List of brokers that are part of the Kafka system that we wish to - * interact with. The format is host1:port1,host2:port2. - */ - brokerListString: String, - - /** - * A method that returns a ZkUtils for the Kafka system. This is invoked - * when the system admin is attempting to create a coordinator stream. - */ - connectZk: () => ZkUtils, - - /** - * Custom properties to use when the system admin tries to create a new - * coordinator stream. - */ - coordinatorStreamProperties: Properties = new Properties, - - /** - * The replication factor to use when the system admin creates a new - * coordinator stream. - */ - coordinatorStreamReplicationFactor: Int = 1, - - /** - * The timeout to use for the simple consumer when fetching metadata from - * Kafka. Equivalent to Kafka's socket.timeout.ms configuration. - */ - timeout: Int = Int.MaxValue, - - /** - * The buffer size to use for the simple consumer when fetching metadata - * from Kafka. Equivalent to Kafka's socket.receive.buffer.bytes - * configuration. - */ - bufferSize: Int = ConsumerConfig.SocketBufferSize, - - /** - * The client ID to use for the simple consumer when fetching metadata from - * Kafka. Equivalent to Kafka's client.id configuration. - */ - clientId: String = UUID.randomUUID.toString, - - /** - * Replication factor for the Changelog topic in kafka - * Kafka properties to be used during the Changelog topic creation - */ - topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo](), - - /** - * Kafka properties to be used during the intermediate topic creation - */ - intermediateStreamProperties: Map[String, Properties] = Map(), - - /** - * Whether deleteMessages() API can be used - */ - deleteCommittedMessages: Boolean = false) extends ExtendedSystemAdmin with Logging { - - import KafkaSystemAdmin._ - - @volatile var running = false - @volatile var adminClient: AdminClient = null - - override def start() = { - if (!running) { - running = true - adminClient = createAdminClient() - } - } - - override def stop() = { - if (running) { - running = false - adminClient.close() - adminClient = null - } - } - - private def createAdminClient(): AdminClient = { - val props = new Properties() - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerListString) - AdminClient.create(props) - } - - override def getSystemStreamPartitionCounts(streams: util.Set[String], cacheTTL: Long): util.Map[String, SystemStreamMetadata] = { - getSystemStreamPartitionCounts(streams, new ExponentialSleepStrategy(initialDelayMs = 500), cacheTTL) - } - - def getSystemStreamPartitionCounts(streams: util.Set[String], retryBackoff: ExponentialSleepStrategy, cacheTTL: Long = Long.MaxValue): util.Map[String, SystemStreamMetadata] = { - debug("Fetching system stream partition count for: %s" format streams) - var metadataTTL = cacheTTL - retryBackoff.run( - loop => { - val metadata = TopicMetadataCache.getTopicMetadata( - streams.asScala.toSet, - systemName, - getTopicMetadata, - metadataTTL) - val result = metadata.map { - case (topic, topicMetadata) => { - KafkaUtil.maybeThrowException(topicMetadata.error.exception()) - val partitionsMap = topicMetadata.partitionsMetadata.map { - pm => - new Partition(pm.partitionId) -> new SystemStreamPartitionMetadata("", "", "") - }.toMap[Partition, SystemStreamPartitionMetadata] - (topic -> new SystemStreamMetadata(topic, partitionsMap.asJava)) - } - } - loop.done - result.asJava - }, - - (exception, loop) => { - warn("Unable to fetch last offsets for streams %s due to %s. Retrying." format (streams, exception)) - debug("Exception detail:", exception) - if (metadataTTL == Long.MaxValue) { - metadataTTL = 5000 // Revert to the default cache expiration - } - } - ).getOrElse(throw new SamzaException("Failed to get system stream metadata")) - } - - /** - * Returns the offset for the message after the specified offset for each - * SystemStreamPartition that was passed in. - */ - - override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = { - // This is safe to do with Kafka, even if a topic is key-deduped. If the - // offset doesn't exist on a compacted topic, Kafka will return the first - // message AFTER the offset that was specified in the fetch request. - offsets.asScala.mapValues(offset => (offset.toLong + 1).toString).asJava - } - - override def getSystemStreamMetadata(streams: java.util.Set[String]) = - getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500)).asJava - - /** - * Given a set of stream names (topics), fetch metadata from Kafka for each - * stream, and return a map from stream name to SystemStreamMetadata for - * each stream. This method will return null for oldest and newest offsets - * if a given SystemStreamPartition is empty. This method will block and - * retry indefinitely until it gets a successful response from Kafka. - */ - def getSystemStreamMetadata(streams: java.util.Set[String], retryBackoff: ExponentialSleepStrategy) = { - debug("Fetching system stream metadata for: %s" format streams) - var metadataTTL = Long.MaxValue // Trust the cache until we get an exception - retryBackoff.run( - loop => { - val metadata = TopicMetadataCache.getTopicMetadata( - streams.asScala.toSet, - systemName, - getTopicMetadata, - metadataTTL) - - debug("Got metadata for streams: %s" format metadata) - - val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata) - var oldestOffsets = Map[SystemStreamPartition, String]() - var newestOffsets = Map[SystemStreamPartition, String]() - var upcomingOffsets = Map[SystemStreamPartition, String]() - - // Get oldest, newest, and upcoming offsets for each topic and partition. - for ((broker, topicsAndPartitions) <- brokersToTopicPartitions) { - debug("Fetching offsets for %s:%s: %s" format (broker.host, broker.port, topicsAndPartitions)) - - val consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId) - try { - upcomingOffsets ++= getOffsets(consumer, topicsAndPartitions, OffsetRequest.LatestTime) - oldestOffsets ++= getOffsets(consumer, topicsAndPartitions, OffsetRequest.EarliestTime) - - // Kafka's "latest" offset is always last message in stream's offset + - // 1, so get newest message in stream by subtracting one. this is safe - // even for key-deduplicated streams, since the last message will - // never be deduplicated. - newestOffsets = upcomingOffsets.mapValues(offset => (offset.toLong - 1).toString) - // Keep only oldest/newest offsets where there is a message. Should - // return null offsets for empty streams. - upcomingOffsets.foreach { - case (topicAndPartition, offset) => - if (offset.toLong <= 0) { - debug("Stripping newest offsets for %s because the topic appears empty." format topicAndPartition) - newestOffsets -= topicAndPartition - debug("Setting oldest offset to 0 to consume from beginning") - oldestOffsets += (topicAndPartition -> "0") - } - } - } finally { - consumer.close - } - } - - val result = assembleMetadata(oldestOffsets, newestOffsets, upcomingOffsets) - loop.done - result - }, - - (exception, loop) => { - warn("Unable to fetch last offsets for streams %s due to %s. Retrying." format (streams, exception)) - debug("Exception detail:", exception) - metadataTTL = 5000 // Revert to the default cache expiration - }).getOrElse(throw new SamzaException("Failed to get system stream metadata")) - } - - /** - * Returns the newest offset for the specified SSP. - * This method is fast and targeted. It minimizes the number of kafka requests. - * It does not retry indefinitely if there is any failure. - * It returns null if the topic is empty. To get the offsets for *all* - * partitions, it would be more efficient to call getSystemStreamMetadata - */ - override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = { - debug("Fetching newest offset for: %s" format ssp) - var offset: String = null - var metadataTTL = Long.MaxValue // Trust the cache until we get an exception - var retries = maxRetries - new ExponentialSleepStrategy().run( - loop => { - val metadata = TopicMetadataCache.getTopicMetadata( - Set(ssp.getStream), - systemName, - getTopicMetadata, - metadataTTL) - debug("Got metadata for streams: %s" format metadata) - - val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata) - val topicAndPartition = new TopicAndPartition(ssp.getStream, ssp.getPartition.getPartitionId) - val broker = brokersToTopicPartitions.filter((e) => e._2.contains(topicAndPartition)).head._1 - - // Get oldest, newest, and upcoming offsets for each topic and partition. - debug("Fetching offset for %s:%s: %s" format (broker.host, broker.port, topicAndPartition)) - val consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId) - try { - offset = getOffsets(consumer, Set(topicAndPartition), OffsetRequest.LatestTime).head._2 - - // Kafka's "latest" offset is always last message in stream's offset + - // 1, so get newest message in stream by subtracting one. this is safe - // even for key-deduplicated streams, since the last message will - // never be deduplicated. - if (offset.toLong <= 0) { - debug("Stripping newest offsets for %s because the topic appears empty." format topicAndPartition) - offset = null - } else { - offset = (offset.toLong - 1).toString - } - } finally { - consumer.close - } - - debug("Got offset %s for %s." format(offset, ssp)) - loop.done - }, - - (exception, loop) => { - if (retries > 0) { - warn("Exception while trying to get offset for %s: %s. Retrying." format(ssp, exception)) - metadataTTL = 0L // Force metadata refresh - retries -= 1 - } else { - warn("Exception while trying to get offset for %s" format(ssp), exception) - loop.done - throw exception - } - }) - - offset - } - - /** - * Helper method to use topic metadata cache when fetching metadata, so we - * don't hammer Kafka more than we need to. - */ - def getTopicMetadata(topics: Set[String]) = { - new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout) - .getTopicInfo(topics) - } - - /** - * Break topic metadata topic/partitions into per-broker map so that we can - * execute only one offset request per broker. - */ - private def getTopicsAndPartitionsByBroker(metadata: Map[String, TopicMetadata]) = { - val brokersToTopicPartitions = metadata - .values - // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)] - .flatMap(topicMetadata => { - KafkaUtil.maybeThrowException(topicMetadata.error.exception()) - topicMetadata - .partitionsMetadata - // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)] - .map(partitionMetadata => { - val topicAndPartition = new TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId) - val leader = partitionMetadata - .leader - .getOrElse(throw new SamzaException("Need leaders for all partitions when fetching offsets. No leader available for TopicAndPartition: %s" format topicAndPartition)) - (leader, topicAndPartition) - }) - }) - - // Convert to a Map[Broker, Seq[(Broker, TopicAndPartition)]] - .groupBy(_._1) - // Convert to a Map[Broker, Set[TopicAndPartition]] - .mapValues(_.map(_._2).toSet) - - debug("Got topic partition data for brokers: %s" format brokersToTopicPartitions) - - brokersToTopicPartitions - } - - /** - * Use a SimpleConsumer to fetch either the earliest or latest offset from - * Kafka for each topic/partition in the topicsAndPartitions set. It is - * assumed that all topics/partitions supplied reside on the broker that the - * consumer is connected to. - */ - private def getOffsets(consumer: SimpleConsumer, topicsAndPartitions: Set[TopicAndPartition], earliestOrLatest: Long) = { - debug("Getting offsets for %s using earliest/latest value of %s." format (topicsAndPartitions, earliestOrLatest)) - - var offsets = Map[SystemStreamPartition, String]() - val partitionOffsetInfo = topicsAndPartitions - .map(topicAndPartition => (topicAndPartition, PartitionOffsetRequestInfo(earliestOrLatest, 1))) - .toMap - val brokerOffsets = consumer - .getOffsetsBefore(new OffsetRequest(partitionOffsetInfo)) - .partitionErrorAndOffsets - .mapValues(partitionErrorAndOffset => { - KafkaUtil.maybeThrowException(partitionErrorAndOffset.error.exception()) - partitionErrorAndOffset.offsets.head - }) - - for ((topicAndPartition, offset) <- brokerOffsets) { - offsets += new SystemStreamPartition(systemName, topicAndPartition.topic, new Partition(topicAndPartition.partition)) -> offset.toString - } - - debug("Got offsets for %s using earliest/latest value of %s: %s" format (topicsAndPartitions, earliestOrLatest, offsets)) - - offsets - } - - /** - * @inheritdoc - */ - override def createStream(spec: StreamSpec): Boolean = { - info("Create topic %s in system %s" format (spec.getPhysicalName, systemName)) - val kSpec = toKafkaSpec(spec) - var streamCreated = false - - new ExponentialSleepStrategy(initialDelayMs = 500).run( - loop => { - val zkClient = connectZk() - try { - AdminUtils.createTopic( - zkClient, - kSpec.getPhysicalName, - kSpec.getPartitionCount, - kSpec.getReplicationFactor, - kSpec.getProperties) - } finally { - zkClient.close - } - - streamCreated = true - loop.done - }, - - (exception, loop) => { - exception match { - case e: TopicExistsException => - streamCreated = false - loop.done - case e: Exception => - warn("Failed to create topic %s: %s. Retrying." format (spec.getPhysicalName, e)) - debug("Exception detail:", e) - } - }) - - streamCreated - } - - /** - * Converts a StreamSpec into a KafakStreamSpec. Special handling for coordinator and changelog stream. - * @param spec a StreamSpec object - * @return KafkaStreamSpec object - */ - def toKafkaSpec(spec: StreamSpec): KafkaStreamSpec = { - if (spec.isChangeLogStream) { - val topicName = spec.getPhysicalName - val topicMeta = topicMetaInformation.getOrElse(topicName, throw new StreamValidationException("Unable to find topic information for topic " + topicName)) - new KafkaStreamSpec(spec.getId, topicName, systemName, spec.getPartitionCount, topicMeta.replicationFactor, - topicMeta.kafkaProps) - } else if (spec.isCoordinatorStream){ - new KafkaStreamSpec(spec.getId, spec.getPhysicalName, systemName, 1, coordinatorStreamReplicationFactor, - coordinatorStreamProperties) - } else if (intermediateStreamProperties.contains(spec.getId)) { - KafkaStreamSpec.fromSpec(spec).copyWithProperties(intermediateStreamProperties(spec.getId)) - } else { - KafkaStreamSpec.fromSpec(spec) - } - } - - /** - * @inheritdoc - * - * Validates a stream in Kafka. Should not be called before createStream(), - * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients, - * is not read-only and will auto-create a new topic. - */ - override def validateStream(spec: StreamSpec): Unit = { - val topicName = spec.getPhysicalName - info("Validating topic %s." format topicName) - - val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy - var metadataTTL = Long.MaxValue // Trust the cache until we get an exception - retryBackoff.run( - loop => { - val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout) - val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo, metadataTTL) - val topicMetadata = topicMetadataMap(topicName) - KafkaUtil.maybeThrowException(topicMetadata.error.exception()) - - val partitionCount = topicMetadata.partitionsMetadata.length - if (partitionCount != spec.getPartitionCount) { - throw new StreamValidationException("Topic validation failed for topic %s because partition count %s did not match expected partition count of %d" format (topicName, topicMetadata.partitionsMetadata.length, spec.getPartitionCount)) - } - - info("Successfully validated topic %s." format topicName) - loop.done - }, - - (exception, loop) => { - exception match { - case e: StreamValidationException => throw e - case e: Exception => - warn("While trying to validate topic %s: %s. Retrying." format (topicName, e)) - debug("Exception detail:", e) - metadataTTL = 5000L // Revert to the default value - } - }) - } - - /** - * @inheritdoc - * - * Delete a stream in Kafka. Deleting topics works only when the broker is configured with "delete.topic.enable=true". - * Otherwise it's a no-op. - */ - override def clearStream(spec: StreamSpec): Boolean = { - info("Delete topic %s in system %s" format (spec.getPhysicalName, systemName)) - val kSpec = KafkaStreamSpec.fromSpec(spec) - var retries = CLEAR_STREAM_RETRIES - new ExponentialSleepStrategy().run( - loop => { - val zkClient = connectZk() - try { - AdminUtils.deleteTopic( - zkClient, - kSpec.getPhysicalName) - } finally { - zkClient.close - } - - loop.done - }, - - (exception, loop) => { - if (retries > 0) { - warn("Exception while trying to delete topic %s: %s. Retrying." format (spec.getPhysicalName, exception)) - retries -= 1 - } else { - warn("Fail to delete topic %s: %s" format (spec.getPhysicalName, exception)) - loop.done - throw exception - } - }) - - val topicMetadata = getTopicMetadata(Set(kSpec.getPhysicalName)).get(kSpec.getPhysicalName).get - topicMetadata.partitionsMetadata.isEmpty - } - - /** - * @inheritdoc - * - * Delete records up to (and including) the provided ssp offsets for all system stream partitions specified in the map - * This only works with Kafka cluster 0.11 or later. Otherwise it's a no-op. - */ - override def deleteMessages(offsets: util.Map[SystemStreamPartition, String]) { - if (!running) { - throw new SamzaException(s"KafkaSystemAdmin has not started yet for system $systemName") - } - if (deleteCommittedMessages) { - val nextOffsets = offsets.asScala.toSeq.map { case (systemStreamPartition, offset) => - (new TopicPartition(systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), offset.toLong + 1) - }.toMap - adminClient.deleteRecordsBefore(nextOffsets) - deleteMessagesCalled = true - } - } - - /** - * Compare the two offsets. Returns x where x < 0 if offset1 < offset2; - * x == 0 if offset1 == offset2; x > 0 if offset1 > offset2. - * - * Currently it's used in the context of the broadcast streams to detect - * the mismatch between two streams when consuming the broadcast streams. - */ - override def offsetComparator(offset1: String, offset2: String): Integer = { - offset1.toLong compare offset2.toLong - } -}
http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdminUtilsScala.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdminUtilsScala.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdminUtilsScala.scala new file mode 100644 index 0000000..6ff2b50 --- /dev/null +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdminUtilsScala.scala @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kafka + +import java.util +import java.util.Properties + +import kafka.admin.{AdminClient, AdminUtils} +import kafka.utils.{Logging, ZkUtils} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.TopicExistsException +import org.apache.samza.config.ApplicationConfig.ApplicationMode +import org.apache.samza.config.{ApplicationConfig, Config, KafkaConfig, StreamConfig} +import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.apache.samza.system.{StreamSpec, SystemStreamMetadata, SystemStreamPartition} +import org.apache.samza.util.ExponentialSleepStrategy +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ + +/** + * A helper class that is used to construct the changelog stream specific information + * + * @param replicationFactor The number of replicas for the changelog stream + * @param kafkaProps The kafka specific properties that need to be used for changelog stream creation + */ +case class ChangelogInfo(var replicationFactor: Int, var kafkaProps: Properties) + + +// TODO move to org.apache.kafka.clients.admin.AdminClien from the kafka.admin.AdminClient +object KafkaSystemAdminUtilsScala extends Logging { + + val CLEAR_STREAM_RETRIES = 3 + val CREATE_STREAM_RETRIES = 10 + + /** + * @inheritdoc + * + * Delete a stream in Kafka. Deleting topics works only when the broker is configured with "delete.topic.enable=true". + * Otherwise it's a no-op. + */ + def clearStream(spec: StreamSpec, connectZk: java.util.function.Supplier[ZkUtils]): Unit = { + info("Deleting topic %s for system %s" format(spec.getPhysicalName, spec.getSystemName)) + val kSpec = KafkaStreamSpec.fromSpec(spec) + var retries = CLEAR_STREAM_RETRIES + new ExponentialSleepStrategy().run( + loop => { + val zkClient = connectZk.get() + try { + AdminUtils.deleteTopic( + zkClient, + kSpec.getPhysicalName) + } finally { + zkClient.close + } + + loop.done + }, + + (exception, loop) => { + if (retries > 0) { + warn("Exception while trying to delete topic %s. Retrying." format (spec.getPhysicalName), exception) + retries -= 1 + } else { + warn("Fail to delete topic %s." format (spec.getPhysicalName), exception) + loop.done + throw exception + } + }) + } + + + def createStream(kSpec: KafkaStreamSpec, connectZk: java.util.function.Supplier[ZkUtils]): Boolean = { + info("Creating topic %s for system %s" format(kSpec.getPhysicalName, kSpec.getSystemName)) + var streamCreated = false + var retries = CREATE_STREAM_RETRIES + + new ExponentialSleepStrategy(initialDelayMs = 500).run( + loop => { + val zkClient = connectZk.get() + try { + AdminUtils.createTopic( + zkClient, + kSpec.getPhysicalName, + kSpec.getPartitionCount, + kSpec.getReplicationFactor, + kSpec.getProperties) + } finally { + zkClient.close + } + + streamCreated = true + loop.done + }, + + (exception, loop) => { + exception match { + case e: TopicExistsException => + streamCreated = false + loop.done + case e: Exception => + if (retries > 0) { + warn("Failed to create topic %s. Retrying." format (kSpec.getPhysicalName), exception) + retries -= 1 + } else { + error("Failed to create topic %s. Bailing out." format (kSpec.getPhysicalName), exception) + throw exception + } + } + }) + + streamCreated + } + + /** + * A helper method that takes oldest, newest, and upcoming offsets for each + * system stream partition, and creates a single map from stream name to + * SystemStreamMetadata. + */ + def assembleMetadata(oldestOffsets: Map[SystemStreamPartition, String], newestOffsets: Map[SystemStreamPartition, String], upcomingOffsets: Map[SystemStreamPartition, String]): Map[String, SystemStreamMetadata] = { + val allMetadata = (oldestOffsets.keySet ++ newestOffsets.keySet ++ upcomingOffsets.keySet) + .groupBy(_.getStream) + .map { + case (streamName, systemStreamPartitions) => + val streamPartitionMetadata = systemStreamPartitions + .map(systemStreamPartition => { + val partitionMetadata = new SystemStreamPartitionMetadata( + // If the topic/partition is empty then oldest and newest will + // be stripped of their offsets, so default to null. + oldestOffsets.getOrElse(systemStreamPartition, null), + newestOffsets.getOrElse(systemStreamPartition, null), + upcomingOffsets(systemStreamPartition)) + (systemStreamPartition.getPartition, partitionMetadata) + }) + .toMap + val streamMetadata = new SystemStreamMetadata(streamName, streamPartitionMetadata.asJava) + (streamName, streamMetadata) + } + .toMap + + // This is typically printed downstream and it can be spammy, so debug level here. + debug("Got metadata: %s" format allMetadata) + + allMetadata + } + + def getCoordinatorTopicProperties(config: KafkaConfig) = { + val segmentBytes = config.getCoordinatorSegmentBytes + (new Properties /: Map( + "cleanup.policy" -> "compact", + "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props } + } + + def getIntermediateStreamProperties(config: Config): Map[String, Properties] = { + val appConfig = new ApplicationConfig(config) + if (appConfig.getAppMode == ApplicationMode.BATCH) { + val streamConfig = new StreamConfig(config) + streamConfig.getStreamIds().filter(streamConfig.getIsIntermediateStream(_)).map(streamId => { + val properties = new Properties() + properties.putAll(streamConfig.getStreamProperties(streamId)) + properties.putIfAbsent("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH)) + (streamId, properties) + }).toMap + } else { + Map() + } + } + + def deleteMessages(adminClient : AdminClient, offsets: util.Map[SystemStreamPartition, String]) = { + val nextOffsets = offsets.asScala.toSeq.map { case (systemStreamPartition, offset) => + (new TopicPartition(systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), offset.toLong + 1) + }.toMap + adminClient.deleteRecordsBefore(nextOffsets); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java deleted file mode 100644 index 10ce274..0000000 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java +++ /dev/null @@ -1,371 +0,0 @@ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.samza.system.kafka; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import kafka.common.TopicAndPartition; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.config.KafkaConfig; -import org.apache.samza.config.KafkaConsumerConfig; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemConsumer; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.util.BlockingEnvelopeMap; -import org.apache.samza.util.Clock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Option; - - -public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements SystemConsumer { - - private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemConsumer.class); - - private static final long FETCH_THRESHOLD = 50000; - private static final long FETCH_THRESHOLD_BYTES = -1L; - - private final Consumer<K, V> kafkaConsumer; - private final String systemName; - private final String clientId; - private final AtomicBoolean stopped = new AtomicBoolean(false); - private final AtomicBoolean started = new AtomicBoolean(false); - private final Config config; - private final boolean fetchThresholdBytesEnabled; - private final KafkaSystemConsumerMetrics metrics; - - // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap. - final KafkaConsumerMessageSink messageSink; - - // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates - // BlockingEnvelopMap's buffers. - final private KafkaConsumerProxy proxy; - - // keep registration data until the start - mapping between registered SSPs and topicPartitions, and their offsets - final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>(); - final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>(); - - long perPartitionFetchThreshold; - long perPartitionFetchThresholdBytes; - - /** - * Create a KafkaSystemConsumer for the provided {@code systemName} - * @param systemName system name for which we create the consumer - * @param config application config - * @param metrics metrics for this KafkaSystemConsumer - * @param clock system clock - */ - public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId, - KafkaSystemConsumerMetrics metrics, Clock clock) { - - super(metrics.registry(), clock, metrics.getClass().getName()); - - this.kafkaConsumer = kafkaConsumer; - this.clientId = clientId; - this.systemName = systemName; - this.config = config; - this.metrics = metrics; - - fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName); - - // create a sink for passing the messages between the proxy and the consumer - messageSink = new KafkaConsumerMessageSink(); - - // Create the proxy to do the actual message reading. - String metricName = String.format("%s", systemName); - proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, metrics, metricName); - LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy); - } - - /** - * Create internal kafka consumer object, which will be used in the Proxy. - * @param systemName system name for which we create the consumer - * @param clientId client id to use int the kafka client - * @param config config - * @return kafka consumer object - */ - public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String systemName, String clientId, Config config) { - - // extract kafka client configs - KafkaConsumerConfig consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId); - - LOG.info("{}: KafkaClient properties {}", systemName, consumerConfig); - - return new KafkaConsumer(consumerConfig); - } - - @Override - public void start() { - if (!started.compareAndSet(false, true)) { - LOG.warn("{}: Attempting to start the consumer for the second (or more) time.", this); - return; - } - if (stopped.get()) { - LOG.error("{}: Attempting to start a stopped consumer", this); - return; - } - // initialize the subscriptions for all the registered TopicPartitions - startSubscription(); - // needs to be called after all the registrations are completed - setFetchThresholds(); - - startConsumer(); - LOG.info("{}: Consumer started", this); - } - - private void startSubscription() { - //subscribe to all the registered TopicPartitions - LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToSSP.keySet()); - try { - synchronized (kafkaConsumer) { - // we are using assign (and not subscribe), so we need to specify both topic and partition - kafkaConsumer.assign(topicPartitionsToSSP.keySet()); - } - } catch (Exception e) { - throw new SamzaException("Consumer subscription failed for " + this, e); - } - } - - /** - * Set the offsets to start from. - * Register the TopicPartitions with the proxy. - * Start the proxy. - */ - void startConsumer() { - // set the offset for each TopicPartition - if (topicPartitionsToOffset.size() <= 0) { - LOG.error ("{}: Consumer is not subscribed to any SSPs", this); - } - - topicPartitionsToOffset.forEach((tp, startingOffsetString) -> { - long startingOffset = Long.valueOf(startingOffsetString); - - try { - synchronized (kafkaConsumer) { - kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value - } - } catch (Exception e) { - // all recoverable execptions are handled by the client. - // if we get here there is nothing left to do but bail out. - String msg = - String.format("%s: Got Exception while seeking to %s for partition %s", this, startingOffsetString, tp); - LOG.error(msg, e); - throw new SamzaException(msg, e); - } - - LOG.info("{}: Changing consumer's starting offset for tp = %s to %s", this, tp, startingOffsetString); - - // add the partition to the proxy - proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset); - }); - - // start the proxy thread - if (proxy != null && !proxy.isRunning()) { - LOG.info("{}: Starting proxy {}", this, proxy); - proxy.start(); - } - } - - private void setFetchThresholds() { - // get the thresholds, and set defaults if not defined. - KafkaConfig kafkaConfig = new KafkaConfig(config); - - Option<String> fetchThresholdOption = kafkaConfig.getConsumerFetchThreshold(systemName); - long fetchThreshold = FETCH_THRESHOLD; - if (fetchThresholdOption.isDefined()) { - fetchThreshold = Long.valueOf(fetchThresholdOption.get()); - } - - Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName); - long fetchThresholdBytes = FETCH_THRESHOLD_BYTES; - if (fetchThresholdBytesOption.isDefined()) { - fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get()); - } - - int numPartitions = topicPartitionsToSSP.size(); - if (numPartitions != topicPartitionsToOffset.size()) { - throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()"); - } - - - if (numPartitions > 0) { - perPartitionFetchThreshold = fetchThreshold / numPartitions; - if (fetchThresholdBytesEnabled) { - // currently this feature cannot be enabled, because we do not have the size of the messages available. - // messages get double buffered, hence divide by 2 - perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numPartitions; - } - } - LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; numPartitions={}, perPartitionFetchThreshold={}, perPartitionFetchThresholdBytes(0 if disabled)={}", - this, fetchThresholdBytes, fetchThreshold, numPartitions, perPartitionFetchThreshold, perPartitionFetchThresholdBytes); - } - - @Override - public void stop() { - if (!stopped.compareAndSet(false, true)) { - LOG.warn("{}: Attempting to stop stopped consumer.", this); - return; - } - - LOG.info("{}: Stopping Samza kafkaConsumer ", this); - - // stop the proxy (with 1 minute timeout) - if (proxy != null) { - LOG.info("{}: Stopping proxy {}", this, proxy); - proxy.stop(TimeUnit.SECONDS.toMillis(60)); - } - - try { - synchronized (kafkaConsumer) { - LOG.info("{}: Closing kafkaSystemConsumer {}", this, kafkaConsumer); - kafkaConsumer.close(); - } - } catch (Exception e) { - LOG.warn("{}: Failed to stop KafkaSystemConsumer.", this, e); - } - } - - /** - * record the ssp and the offset. Do not submit it to the consumer yet. - * @param systemStreamPartition ssp to register - * @param offset offset to register with - */ - @Override - public void register(SystemStreamPartition systemStreamPartition, String offset) { - if (started.get()) { - String msg = String.format("%s: Trying to register partition after consumer has been started. ssp=%s", this, - systemStreamPartition); - throw new SamzaException(msg); - } - - if (!systemStreamPartition.getSystem().equals(systemName)) { - LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition); - return; - } - LOG.info("{}: Registering ssp = {} with offset {}", this, systemStreamPartition, offset); - - super.register(systemStreamPartition, offset); - - TopicPartition tp = toTopicPartition(systemStreamPartition); - - topicPartitionsToSSP.put(tp, systemStreamPartition); - - String existingOffset = topicPartitionsToOffset.get(tp); - // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages. - if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) { - topicPartitionsToOffset.put(tp, offset); - } - - metrics.registerTopicAndPartition(toTopicAndPartition(tp)); - } - - /** - * Compare two String offsets. - * Note. There is a method in KafkaSystemAdmin that does that, but that would require instantiation of systemadmin for each consumer. - * @return see {@link Long#compareTo(Long)} - */ - private static int compareOffsets(String offset1, String offset2) { - return Long.valueOf(offset1).compareTo(Long.valueOf(offset2)); - } - - @Override - public String toString() { - return String.format("%s:%s", systemName, clientId); - } - - @Override - public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll( - Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { - - // check if the proxy is running - if (!proxy.isRunning()) { - stop(); - String message = String.format("%s: KafkaConsumerProxy has stopped.", this); - throw new SamzaException(message, proxy.getFailureCause()); - } - - return super.poll(systemStreamPartitions, timeout); - } - - /** - * convert from TopicPartition to TopicAndPartition - */ - public static TopicAndPartition toTopicAndPartition(TopicPartition tp) { - return new TopicAndPartition(tp.topic(), tp.partition()); - } - - /** - * convert to TopicPartition from SystemStreamPartition - */ - public static TopicPartition toTopicPartition(SystemStreamPartition ssp) { - return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId()); - } - - /** - * return system name for this consumer - * @return system name - */ - public String getSystemName() { - return systemName; - } - - public class KafkaConsumerMessageSink { - - public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) { - setIsAtHead(ssp, isAtHighWatermark); - } - - boolean needsMoreMessages(SystemStreamPartition ssp) { - LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};" - + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, fetchThresholdBytesEnabled, - getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp), - perPartitionFetchThreshold); - - if (fetchThresholdBytesEnabled) { - return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes; - } else { - return getNumMessagesInQueue(ssp) < perPartitionFetchThreshold; - } - } - - void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) { - LOG.trace("{}: Incoming message ssp = {}: envelope = {}.", this, ssp, envelope); - - try { - put(ssp, envelope); - } catch (InterruptedException e) { - throw new SamzaException( - String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s", this, - envelope.getOffset(), ssp)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index ba5390b..f314f92 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -21,13 +21,11 @@ package org.apache.samza.system.kafka import java.util.Properties -import kafka.utils.ZkUtils +import com.google.common.annotations.VisibleForTesting import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.samza.SamzaException import org.apache.samza.config.ApplicationConfig.ApplicationMode import org.apache.samza.config.KafkaConfig.Config2Kafka import org.apache.samza.config.StorageConfig._ -import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task import org.apache.samza.config._ import org.apache.samza.metrics.MetricsRegistry @@ -35,32 +33,40 @@ import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, Syst import org.apache.samza.util._ object KafkaSystemFactory extends Logging { + @VisibleForTesting def getInjectedProducerProperties(systemName: String, config: Config) = if (config.isChangelogSystem(systemName)) { warn("System name '%s' is being used as a changelog. Disabling compression since Kafka does not support compression for log compacted topics." format systemName) Map[String, String]("compression.type" -> "none") } else { Map[String, String]() } + + val CLIENTID_PRODUCER_PREFIX = "kafka-producer" + val CLIENTID_CONSUMER_PREFIX = "kafka-consumer" + val CLIENTID_ADMIN_PREFIX = "kafka-admin-consumer" } class KafkaSystemFactory extends SystemFactory with Logging { def getConsumer(systemName: String, config: Config, registry: MetricsRegistry): SystemConsumer = { - val clientId = KafkaConsumerConfig.getConsumerClientId( config) val metrics = new KafkaSystemConsumerMetrics(systemName, registry) - val kafkaConsumer = KafkaSystemConsumer.getKafkaConsumerImpl(systemName, clientId, config) + val clientId = KafkaConsumerConfig.createClientId(KafkaSystemFactory.CLIENTID_CONSUMER_PREFIX, config); + val kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId); + + val kafkaConsumer = KafkaSystemConsumer.createKafkaConsumerImpl[Array[Byte], Array[Byte]](systemName, kafkaConsumerConfig) info("Created kafka consumer for system %s, clientId %s: %s" format (systemName, clientId, kafkaConsumer)) - val kafkaSystemConsumer = new KafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, new SystemClock) - info("Created samza system consumer %s" format (kafkaSystemConsumer.toString)) + val kafkaSystemConsumer = new KafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, + new SystemClock) + info("Created samza system consumer for system %s, config %s: %s" format(systemName, config, kafkaSystemConsumer)) kafkaSystemConsumer } def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = { - val clientId = KafkaConsumerConfig.getProducerClientId(config) val injectedProps = KafkaSystemFactory.getInjectedProducerProperties(systemName, config) + val clientId = KafkaConsumerConfig.createClientId(KafkaSystemFactory.CLIENTID_PRODUCER_PREFIX, config); val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, injectedProps) val getProducer = () => { new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) @@ -70,6 +76,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { // Unlike consumer, no need to use encoders here, since they come for free // inside the producer configs. Kafka's producer will handle all of this // for us. + info("Creating kafka producer for system %s, producerClientId %s" format(systemName, clientId)) new KafkaSystemProducer( systemName, @@ -80,43 +87,11 @@ class KafkaSystemFactory extends SystemFactory with Logging { } def getAdmin(systemName: String, config: Config): SystemAdmin = { - val clientId = KafkaConsumerConfig.getAdminClientId(config) - val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId) - val bootstrapServers = producerConfig.bootsrapServers - val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) - val timeout = consumerConfig.socketTimeoutMs - val bufferSize = consumerConfig.socketReceiveBufferBytes - val zkConnect = Option(consumerConfig.zkConnect) - .getOrElse(throw new SamzaException("no zookeeper.connect defined in config")) - val connectZk = () => { - ZkUtils(zkConnect, 6000, 6000, false) - } - val coordinatorStreamProperties = getCoordinatorTopicProperties(config) - val coordinatorStreamReplicationFactor = config.getCoordinatorReplicationFactor.toInt - val storeToChangelog = config.getKafkaChangelogEnabledStores() - // Construct the meta information for each topic, if the replication factor is not defined, we use 2 as the number of replicas for the change log stream. - val topicMetaInformation = storeToChangelog.map { case (storeName, topicName) => { - val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).toInt - val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName)) - info("Creating topic meta information for topic: %s with replication factor: %s" format(topicName, replicationFactor)) - (topicName, changelogInfo) - } - } + // extract kafka client configs + val clientId = KafkaConsumerConfig.createClientId(KafkaSystemFactory.CLIENTID_ADMIN_PREFIX, config); + val consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId) - val deleteCommittedMessages = config.deleteCommittedMessages(systemName).exists(isEnabled => isEnabled.toBoolean) - val intermediateStreamProperties: Map[String, Properties] = getIntermediateStreamProperties(config) - new KafkaSystemAdmin( - systemName, - bootstrapServers, - connectZk, - coordinatorStreamProperties, - coordinatorStreamReplicationFactor, - timeout, - bufferSize, - clientId, - topicMetaInformation, - intermediateStreamProperties, - deleteCommittedMessages) + new KafkaSystemAdmin(systemName, config, KafkaSystemConsumer.createKafkaConsumerImpl(systemName, consumerConfig)) } def getCoordinatorTopicProperties(config: Config) = { http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala index 2d09301..90dfff3 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala @@ -37,17 +37,6 @@ object KafkaUtil extends Logging { val CHECKPOINT_LOG_VERSION_NUMBER = 1 val counter = new AtomicLong(0) - def getClientId(id: String, config: Config): String = getClientId( - id, - config.getName.getOrElse(throw new ConfigException("Missing job name.")), - config.getJobId) - - def getClientId(id: String, jobName: String, jobId: String): String = - "%s-%s-%s" format - (id.replaceAll("[^A-Za-z0-9]", "_"), - jobName.replaceAll("[^A-Za-z0-9]", "_"), - jobId.replaceAll("[^A-Za-z0-9]", "_")) - private def abs(n: Int) = if (n == Integer.MIN_VALUE) 0 else math.abs(n) def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope, partitions: java.util.List[PartitionInfo]): Integer = { http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java index de5d093..62f6269 100644 --- a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java +++ b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java @@ -30,9 +30,11 @@ import org.junit.Test; public class TestKafkaConsumerConfig { public final static String SYSTEM_NAME = "testSystem"; + public final static String JOB_NAME = "jobName"; + public final static String JOB_ID = "jobId"; public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer."; public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer."; - private final static String CLIENT_ID = "clientId"; + private final static String CLIENT_ID_PREFIX = "consumer-client"; @Test public void testDefaults() { @@ -44,15 +46,16 @@ public class TestKafkaConsumerConfig { props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // should NOT be ignored - props.put(JobConfig.JOB_NAME(), "jobName"); + props.put(JobConfig.JOB_NAME(), JOB_NAME); // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092"); props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092"); Config config = new MapConfig(props); + String clientId = KafkaConsumerConfig.createClientId(CLIENT_ID_PREFIX, config); KafkaConsumerConfig kafkaConsumerConfig = - KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID); + KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, clientId); Assert.assertEquals("false", kafkaConsumerConfig.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); @@ -71,21 +74,34 @@ public class TestKafkaConsumerConfig { Assert.assertEquals(ByteArrayDeserializer.class.getName(), kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); - Assert.assertEquals(CLIENT_ID, kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG)); + // validate group and client id generation + Assert.assertEquals(CLIENT_ID_PREFIX.replace("-", "_") + "-" + JOB_NAME + "-" + "1", + kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG)); + + Assert.assertEquals(CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-1", + KafkaConsumerConfig.createClientId(CLIENT_ID_PREFIX, config)); + + Assert.assertEquals("jobName-1", KafkaConsumerConfig.createConsumerGroupId(config)); + + // validate setting of group and client id + Assert.assertEquals(KafkaConsumerConfig.createConsumerGroupId(config), + kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG)); - Assert.assertEquals(KafkaConsumerConfig.getConsumerGroupId(config), + Assert.assertEquals(KafkaConsumerConfig.createConsumerGroupId(config), kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG)); - Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-1", - KafkaConsumerConfig.getConsumerClientId(config)); - Assert.assertEquals("jobName-1", KafkaConsumerConfig.getConsumerGroupId(config)); - props.put(JobConfig.JOB_ID(), "jobId"); + Assert.assertEquals(KafkaConsumerConfig.createClientId(CLIENT_ID_PREFIX, config), + kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG)); + + // with non-default job id + props.put(JobConfig.JOB_ID(), JOB_ID); config = new MapConfig(props); + Assert.assertEquals(CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-jobId", + kafkaConsumerConfig.createClientId(CLIENT_ID_PREFIX, config)); + + Assert.assertEquals("jobName-jobId", KafkaConsumerConfig.createConsumerGroupId(config)); - Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-jobId", - KafkaConsumerConfig.getConsumerClientId(config)); - Assert.assertEquals("jobName-jobId", KafkaConsumerConfig.getConsumerGroupId(config)); } // test stuff that should not be overridden @@ -103,8 +119,9 @@ public class TestKafkaConsumerConfig { props.put(JobConfig.JOB_NAME(), "jobName"); Config config = new MapConfig(props); + String clientId = KafkaConsumerConfig.createClientId(CLIENT_ID_PREFIX, config); KafkaConsumerConfig kafkaConsumerConfig = - KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID); + KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, clientId); Assert.assertEquals("useThis:9092", kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); @@ -121,29 +138,30 @@ public class TestKafkaConsumerConfig { map.put(JobConfig.JOB_NAME(), "jobName"); map.put(JobConfig.JOB_ID(), "jobId"); - String result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map)); + + String result = KafkaConsumerConfig.createClientId("consumer", new MapConfig(map)); Assert.assertEquals("consumer-jobName-jobId", result); - result = KafkaConsumerConfig.getConsumerClientId("consumer-", new MapConfig(map)); + result = KafkaConsumerConfig.createClientId("consumer-", new MapConfig(map)); Assert.assertEquals("consumer_-jobName-jobId", result); - result = KafkaConsumerConfig.getConsumerClientId("super-duper-consumer", new MapConfig(map)); + result = KafkaConsumerConfig.createClientId("super-duper-consumer", new MapConfig(map)); Assert.assertEquals("super_duper_consumer-jobName-jobId", result); map.put(JobConfig.JOB_NAME(), " very important!job"); - result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map)); + result = KafkaConsumerConfig.createClientId("consumer", new MapConfig(map)); Assert.assertEquals("consumer-_very_important_job-jobId", result); map.put(JobConfig.JOB_ID(), "number-#3"); - result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map)); + result = KafkaConsumerConfig.createClientId("consumer", new MapConfig(map)); Assert.assertEquals("consumer-_very_important_job-number__3", result); } @Test(expected = SamzaException.class) public void testNoBootstrapServers() { - KafkaConsumerConfig kafkaConsumerConfig = - KafkaConsumerConfig.getKafkaSystemConsumerConfig(new MapConfig(Collections.emptyMap()), SYSTEM_NAME, - "clientId"); + Config config = new MapConfig(Collections.emptyMap()); + String clientId = KafkaConsumerConfig.createClientId("clientId", config); + KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, clientId); Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); } http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java index 7e968bf..27601b0 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java @@ -19,14 +19,22 @@ package org.apache.samza.system.kafka; +import com.google.common.collect.ImmutableSet; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Properties; import kafka.api.TopicMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.apache.samza.Partition; +import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.StreamValidationException; import org.apache.samza.system.SystemAdmin; -import org.apache.samza.util.ScalaJavaUtil; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -36,6 +44,97 @@ import static org.junit.Assert.*; public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { @Test + public void testGetOffsetsAfter() { + SystemStreamPartition ssp1 = new SystemStreamPartition(SYSTEM(), TOPIC(), new Partition(0)); + SystemStreamPartition ssp2 = new SystemStreamPartition(SYSTEM(), TOPIC(), new Partition(1)); + Map<SystemStreamPartition, String> offsets = new HashMap<>(); + offsets.put(ssp1, "1"); + offsets.put(ssp2, "2"); + + offsets = systemAdmin().getOffsetsAfter(offsets); + + Assert.assertEquals("2", offsets.get(ssp1)); + Assert.assertEquals("3", offsets.get(ssp2)); + } + + @Test + public void testToKafkaSpec() { + String topicName = "testStream"; + + int defaultPartitionCount = 2; + int changeLogPartitionFactor = 5; + Map<String, String> map = new HashMap<>(); + Config config = new MapConfig(map); + StreamSpec spec = new StreamSpec("id", topicName, SYSTEM(), defaultPartitionCount, config); + + KafkaSystemAdmin kafkaAdmin = systemAdmin(); + KafkaStreamSpec kafkaSpec = kafkaAdmin.toKafkaSpec(spec); + + Assert.assertEquals("id", kafkaSpec.getId()); + Assert.assertEquals(topicName, kafkaSpec.getPhysicalName()); + Assert.assertEquals(SYSTEM(), kafkaSpec.getSystemName()); + Assert.assertEquals(defaultPartitionCount, kafkaSpec.getPartitionCount()); + + // validate that conversion is using coordination metadata + map.put("job.coordinator.segment.bytes", "123"); + map.put("job.coordinator.cleanup.policy", "superCompact"); + int coordReplicatonFactor = 4; + map.put(org.apache.samza.config.KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR(), + String.valueOf(coordReplicatonFactor)); + + KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(SYSTEM(), map)); + spec = StreamSpec.createCoordinatorStreamSpec(topicName, SYSTEM()); + kafkaSpec = admin.toKafkaSpec(spec); + Assert.assertEquals(coordReplicatonFactor, kafkaSpec.getReplicationFactor()); + Assert.assertEquals("123", kafkaSpec.getProperties().getProperty("segment.bytes")); + // cleanup policy is overridden in the KafkaAdmin + Assert.assertEquals("compact", kafkaSpec.getProperties().getProperty("cleanup.policy")); + + // validate that conversion is using changeLog metadata + map = new HashMap<>(); + map.put(JobConfig.JOB_DEFAULT_SYSTEM(), SYSTEM()); + + map.put(String.format("stores.%s.changelog", "fakeStore"), topicName); + int changeLogReplicationFactor = 3; + map.put(String.format("stores.%s.changelog.replication.factor", "fakeStore"), + String.valueOf(changeLogReplicationFactor)); + admin = Mockito.spy(createSystemAdmin(SYSTEM(), map)); + spec = StreamSpec.createChangeLogStreamSpec(topicName, SYSTEM(), changeLogPartitionFactor); + kafkaSpec = admin.toKafkaSpec(spec); + Assert.assertEquals(changeLogReplicationFactor, kafkaSpec.getReplicationFactor()); + + // same, but with missing topic info + try { + admin = Mockito.spy(createSystemAdmin(SYSTEM(), map)); + spec = StreamSpec.createChangeLogStreamSpec("anotherTopic", SYSTEM(), changeLogPartitionFactor); + kafkaSpec = admin.toKafkaSpec(spec); + Assert.fail("toKafkaSpec should've failed for missing topic"); + } catch (StreamValidationException e) { + // expected + } + + // validate that conversion is using intermediate streams properties + String interStreamId = "isId"; + + Map<String, String> interStreamMap = new HashMap<>(); + interStreamMap.put("app.mode", ApplicationConfig.ApplicationMode.BATCH.toString()); + interStreamMap.put(String.format("streams.%s.samza.intermediate", interStreamId), "true"); + interStreamMap.put(String.format("streams.%s.samza.system", interStreamId), "testSystem"); + interStreamMap.put(String.format("streams.%s.p1", interStreamId), "v1"); + interStreamMap.put(String.format("streams.%s.retention.ms", interStreamId), "123"); + // legacy format + interStreamMap.put(String.format("systems.%s.streams.%s.p2", "testSystem", interStreamId), "v2"); + + admin = Mockito.spy(createSystemAdmin(SYSTEM(), interStreamMap)); + spec = new StreamSpec(interStreamId, topicName, SYSTEM(), defaultPartitionCount, config); + kafkaSpec = admin.toKafkaSpec(spec); + Assert.assertEquals("v1", kafkaSpec.getProperties().getProperty("p1")); + Assert.assertEquals("v2", kafkaSpec.getProperties().getProperty("p2")); + Assert.assertEquals("123", kafkaSpec.getProperties().getProperty("retention.ms")); + Assert.assertEquals(defaultPartitionCount, kafkaSpec.getPartitionCount()); + } + + @Test public void testCreateCoordinatorStream() { SystemAdmin admin = Mockito.spy(systemAdmin()); StreamSpec spec = StreamSpec.createCoordinatorStreamSpec("testCoordinatorStream", "testSystem"); @@ -49,10 +148,14 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { public void testCreateCoordinatorStreamWithSpecialCharsInTopicName() { final String STREAM = "test.coordinator_test.Stream"; - Properties coordProps = new Properties(); - Map<String, ChangelogInfo> changeLogMap = new HashMap<>(); + Map<String, String> map = new HashMap<>(); + map.put("job.coordinator.segment.bytes", "123"); + map.put("job.coordinator.cleanup.policy", "compact"); + int coordReplicatonFactor = 2; + map.put(org.apache.samza.config.KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR(), + String.valueOf(coordReplicatonFactor)); - KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap))); + KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(SYSTEM(), map)); StreamSpec spec = StreamSpec.createCoordinatorStreamSpec(STREAM, SYSTEM()); Mockito.doAnswer(invocationOnMock -> { @@ -62,6 +165,10 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { assertEquals(SYSTEM(), internalSpec.getSystemName()); assertEquals(STREAM, internalSpec.getPhysicalName()); assertEquals(1, internalSpec.getPartitionCount()); + Assert.assertEquals(coordReplicatonFactor, ((KafkaStreamSpec) internalSpec).getReplicationFactor()); + Assert.assertEquals("123", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("segment.bytes")); + // cleanup policy is overridden in the KafkaAdmin + Assert.assertEquals("compact", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("cleanup.policy")); return internalSpec; }).when(admin).toKafkaSpec(Mockito.any()); @@ -71,62 +178,38 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { } @Test - public void testCreateChangelogStream() { - final String STREAM = "testChangeLogStream"; - final int PARTITIONS = 12; - final int REP_FACTOR = 1; - - Properties coordProps = new Properties(); - Properties changeLogProps = new Properties(); - changeLogProps.setProperty("cleanup.policy", "compact"); - changeLogProps.setProperty("segment.bytes", "139"); - Map<String, ChangelogInfo> changeLogMap = new HashMap<>(); - changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps)); - - KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap))); - StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS); - - Mockito.doAnswer(invocationOnMock -> { - StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod(); - assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor - assertTrue(internalSpec.isChangeLogStream()); - assertEquals(SYSTEM(), internalSpec.getSystemName()); - assertEquals(STREAM, internalSpec.getPhysicalName()); - assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor()); - assertEquals(PARTITIONS, internalSpec.getPartitionCount()); - assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties()); - - return internalSpec; - }).when(admin).toKafkaSpec(Mockito.any()); - - admin.createStream(spec); - admin.validateStream(spec); + public void testCreateChangelogStreamHelp() { + testCreateChangelogStreamHelp("testChangeLogStream"); } @Test public void testCreateChangelogStreamWithSpecialCharsInTopicName() { - final String STREAM = "test.Change_Log.Stream"; + // cannot contain period + testCreateChangelogStreamHelp("test-Change_Log-Stream"); + } + + public void testCreateChangelogStreamHelp(final String topic) { final int PARTITIONS = 12; - final int REP_FACTOR = 1; + final int REP_FACTOR = 2; - Properties coordProps = new Properties(); - Properties changeLogProps = new Properties(); - changeLogProps.setProperty("cleanup.policy", "compact"); - changeLogProps.setProperty("segment.bytes", "139"); - Map<String, ChangelogInfo> changeLogMap = new HashMap<>(); - changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps)); + Map<String, String> map = new HashMap<>(); + map.put(JobConfig.JOB_DEFAULT_SYSTEM(), SYSTEM()); + map.put(String.format("stores.%s.changelog", "fakeStore"), topic); + map.put(String.format("stores.%s.changelog.replication.factor", "fakeStore"), String.valueOf(REP_FACTOR)); + map.put(String.format("stores.%s.changelog.kafka.segment.bytes", "fakeStore"), "139"); + KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(SYSTEM(), map)); + StreamSpec spec = StreamSpec.createChangeLogStreamSpec(topic, SYSTEM(), PARTITIONS); - KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap))); - StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS); Mockito.doAnswer(invocationOnMock -> { StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod(); assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor assertTrue(internalSpec.isChangeLogStream()); assertEquals(SYSTEM(), internalSpec.getSystemName()); - assertEquals(STREAM, internalSpec.getPhysicalName()); + assertEquals(topic, internalSpec.getPhysicalName()); assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor()); assertEquals(PARTITIONS, internalSpec.getPartitionCount()); - assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties()); + assertEquals("139", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("segment.bytes")); + assertEquals("compact", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("cleanup.policy")); return internalSpec; }).when(admin).toKafkaSpec(Mockito.any()); @@ -176,7 +259,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { systemAdmin().validateStream(spec2); } - @Test + //@Test //TODO - currently the connection to ZK fails, but since it checks for empty, the tests succeeds. SAMZA-1887 public void testClearStream() { StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8); @@ -184,8 +267,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { systemAdmin().createStream(spec)); assertTrue(systemAdmin().clearStream(spec)); - scala.collection.immutable.Set<String> topic = new scala.collection.immutable.Set.Set1<>(spec.getPhysicalName()); - scala.collection.immutable.Map<String, TopicMetadata> metadata = systemAdmin().getTopicMetadata(topic); - assertTrue(metadata.get(spec.getPhysicalName()).get().partitionsMetadata().isEmpty()); + ImmutableSet<String> topics = ImmutableSet.of(spec.getPhysicalName()); + Map<String, List<PartitionInfo>> metadata = systemAdmin().getTopicMetadata(topics); + assertTrue(metadata.get(spec.getPhysicalName()).isEmpty()); } }
