Repository: samza Updated Branches: refs/heads/0.13.2 fb39a5142 -> 90fa985ec
SAMZA-1427; use systemFactory in checkpoint manager. Author: Boris S <[email protected]> Author: Boris Shkolnik <[email protected]> Author: Boris Shkolnik <[email protected]> Reviewers: Jagadish Venkatraman <[email protected]>, Prateek Maheshwari <[email protected]> Closes #299 from sborya/LiKafkaClient Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8ee61a68 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8ee61a68 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8ee61a68 Branch: refs/heads/0.13.2 Commit: 8ee61a68f5f1f81a25854ad5b00c38e171878b7d Parents: fb39a51 Author: Boris S <[email protected]> Authored: Wed Oct 4 10:48:58 2017 -0700 Committer: Boris S <[email protected]> Committed: Wed Oct 4 14:18:57 2017 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/samza/util/Util.scala | 27 +- .../kafka/KafkaCheckpointManager.scala | 254 +++++++++---------- .../kafka/KafkaCheckpointManagerFactory.scala | 49 ++-- .../org/apache/samza/config/KafkaConfig.scala | 4 +- .../kafka/TestKafkaCheckpointManager.scala | 94 +++++-- .../system/kafka/TestKafkaSystemAdmin.scala | 2 +- .../test/integration/StreamTaskTestUtil.scala | 2 +- 7 files changed, 242 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/8ee61a68/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 6c224e6..b2c81de 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -19,29 +19,22 @@ package org.apache.samza.util -import java.net._ import java.io._ import java.lang.management.ManagementFactory -import java.util.zip.CRC32 -import org.apache.samza.{SamzaException, Partition} -import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream} +import java.net._ import java.util.Random +import java.util.zip.CRC32 -import org.apache.samza.config.Config -import org.apache.samza.config.ConfigException -import org.apache.samza.config.ConfigRewriter -import org.apache.samza.config.JobConfig -import org.apache.samza.config.MapConfig -import org.apache.samza.config.SystemConfig import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.SystemConfig.Config2System +import org.apache.samza.config._ +import org.apache.samza.serializers._ +import org.apache.samza.system.{SystemFactory, SystemStream, SystemStreamPartition} +import org.apache.samza.{Partition, SamzaException} import scala.collection.JavaConverters._ -import java.io.InputStreamReader - - import scala.collection.immutable.Map -import org.apache.samza.serializers._ + object Util extends Logging { val random = new Random @@ -188,8 +181,8 @@ object Util extends Logging { } /** - * Generates a coordinator stream name based off of the job name and job id - * for the jobd. The format is of the stream name will be + * Generates a coordinator stream name based on the job name and job id + * for the job. The format of the stream name will be: * __samza_coordinator_<JOBNAME>_<JOBID>. */ def getCoordinatorStreamName(jobName: String, jobId: String) = { @@ -224,7 +217,7 @@ object Util extends Logging { } /** - * Get the Coordinator System and system factory from the configuration + * Get the coordinator system and system factory from the configuration * @param config * @return */ http://git-wip-us.apache.org/repos/asf/samza/blob/8ee61a68/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 6461f9d..1e22763 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -21,23 +21,19 @@ package org.apache.samza.checkpoint.kafka import java.nio.ByteBuffer import java.util -import java.util.Properties +import java.util.{Collections, Properties} -import kafka.api._ -import kafka.common.{ErrorMapping, InvalidMessageSizeException, TopicAndPartition, UnknownTopicOrPartitionException} -import kafka.consumer.SimpleConsumer -import kafka.message.InvalidMessageException import kafka.utils.ZkUtils - import org.apache.kafka.common.utils.Utils -import org.apache.kafka.clients.producer.{Producer, ProducerRecord} -import org.apache.samza.SamzaException import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} import org.apache.samza.container.TaskName import org.apache.samza.serializers.CheckpointSerde -import org.apache.samza.system.kafka.TopicMetadataCache +import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.apache.samza.system.{StreamSpec, SystemAdmin, _} import org.apache.samza.util._ +import org.apache.samza.{Partition, SamzaException} +import scala.collection.JavaConversions._ import scala.collection.mutable /** @@ -55,23 +51,26 @@ class KafkaCheckpointManager( socketTimeout: Int, bufferSize: Int, fetchSize: Int, + getSystemConsumer: () => SystemConsumer, + getSystemAdmin: () => SystemAdmin, val metadataStore: TopicMetadataStore, - connectProducer: () => Producer[Array[Byte], Array[Byte]], + getSystemProducer: () => SystemProducer, val connectZk: () => ZkUtils, systemStreamPartitionGrouperFactoryString: String, failOnCheckpointValidation: Boolean, val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, serde: CheckpointSerde = new CheckpointSerde, checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging { - import org.apache.samza.checkpoint.kafka.KafkaCheckpointManager._ var taskNames = Set[TaskName]() - var producer: Producer[Array[Byte], Array[Byte]] = null + @volatile var systemProducer: SystemProducer = null var taskNamesToOffsets: Map[TaskName, Checkpoint] = null + val systemAdmin = getSystemAdmin() - var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint val kafkaUtil: KafkaUtil = new KafkaUtil(retryBackoff, connectZk) + + KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString) info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName)) @@ -86,61 +85,34 @@ class KafkaCheckpointManager( val key = KafkaCheckpointLogKey.getCheckpointKey(taskName) val keyBytes = key.toBytes() val msgBytes = serde.toBytes(checkpoint) + val systemStream = new SystemStream(systemName, checkpointTopic) + val envelope = new OutgoingMessageEnvelope(systemStream, keyBytes, msgBytes) + retryBackoff.run( loop => { - if (producer == null) { - producer = connectProducer() + if (systemProducer == null) { + synchronized { + if (systemProducer == null) { + systemProducer = getSystemProducer() + systemProducer.register(taskName.getTaskName) + systemProducer.start + } + } } - producer.send(new ProducerRecord(checkpointTopic, 0, keyBytes, msgBytes)).get() + systemProducer.send(taskName.getTaskName, envelope) + systemProducer.flush(taskName.getTaskName) // make sure it is written + info("Completed writing checkpoint=%s into %s topic for system %s." format(checkpoint, checkpointTopic, systemName) ) loop.done }, (exception, loop) => { - warn("Failed to write %s partition entry %s: %s. Retrying." format(CHECKPOINT_LOG4J_ENTRY, key, exception)) + warn("Failed to write checkpoint log partition entry %s: %s. Retrying." format(key, exception)) debug("Exception detail:", exception) - if (producer != null) { - producer.close - } - producer = null } ) } - private def getConsumer(): SimpleConsumer = { - val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics)) - val metadata = metadataMap(checkpointTopic) - val partitionMetadata = metadata.partitionsMetadata - .filter(_.partitionId == 0) - .headOption - .getOrElse(throw new KafkaUtilException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka.")) - val leader = partitionMetadata - .leader - .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic)) - - info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic)) - - new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId) - } - - private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1) - - private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = { - val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))) - .partitionErrorAndOffsets - .get(topicAndPartition) - .getOrElse(throw new KafkaUtilException("Unable to find offset information for %s:0" format checkpointTopic)) - // Fail or retry if there was an an issue with the offset request. - KafkaUtil.maybeThrowException(offsetResponse.error) - - val offset: Long = offsetResponse - .offsets - .headOption - .getOrElse(throw new KafkaUtilException("Got response, but no offsets defined for %s:0" format checkpointTopic)) - - offset - } - /** * Read the last checkpoint for specified TaskName * @@ -179,122 +151,132 @@ class KafkaCheckpointManager( def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = { val taskName = checkpointKey.getCheckpointTaskName val checkpoint = serde.fromBytes(Utils.readBytes(payload)) - debug("Adding checkpoint " + checkpoint + " for taskName " + taskName) checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go } - readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint) + readLog(shouldHandleEntry, handleCheckpoint) checkpoints.toMap /* of the immutable kind */ } + private def getSSPMetadata(topic: String, partition: Partition): SystemStreamPartitionMetadata = { + val metaDataMap: java.util.Map[String, SystemStreamMetadata] = systemAdmin.getSystemStreamMetadata(Collections.singleton(topic)) + val checkpointMetadata: SystemStreamMetadata = metaDataMap.get(topic) + if (checkpointMetadata == null) { + throw new SamzaException("Cannot get metadata for system=%s, topic=%s" format(systemName, topic)) + } + + val partitionMetaData = checkpointMetadata.getSystemStreamPartitionMetadata().get(partition) + if (partitionMetaData == null) { + throw new SamzaException("Cannot get partitionMetaData for system=%s, topic=%s" format(systemName, topic)) + } + + return partitionMetaData + } /** - * Common code for reading both changelog partition mapping and change log + * Reads an entry from the checkpoint log and invokes the provided lambda on it. * - * @param entryType What type of entry to look for within the log key's * @param handleEntry Code to handle an entry in the log once it's found */ - private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean, + private def readLog(shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean, handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = { - retryBackoff.run[Unit]( - loop => { - val consumer = getConsumer() - - val topicAndPartition = new TopicAndPartition(checkpointTopic, 0) + val UNKNOWN_OFFSET = "-1" + var attempts = 10 + val POLL_TIMEOUT = 1000L + + val ssp: SystemStreamPartition = new SystemStreamPartition(systemName, checkpointTopic, new Partition(0)) + val systemConsumer = getSystemConsumer() + val partitionMetadata = getSSPMetadata(checkpointTopic, new Partition(0)) + // offsets returned are strings + val newestOffset = if (partitionMetadata.getNewestOffset == null) UNKNOWN_OFFSET else partitionMetadata.getNewestOffset + val oldestOffset = partitionMetadata.getOldestOffset + systemConsumer.register(ssp, oldestOffset) // checkpoint stream should always be read from the beginning + systemConsumer.start() + + var msgCount = 0 + try { + val emptyEnvelopes = util.Collections.emptyMap[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]] + // convert offsets to long + var currentOffset = UNKNOWN_OFFSET.toLong + val newestOffsetLong = newestOffset.toLong + val sspToPoll = Collections.singleton(ssp) + while (currentOffset < newestOffsetLong) { + + val envelopes: java.util.Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]] = try { - var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition)) - - info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType)) - - val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime) - - info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic)) - - if (offset < 0) { - info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic)) - return + systemConsumer.poll(sspToPoll, POLL_TIMEOUT) + } catch { + case e: Exception => { + // these exceptions are most likely intermediate + warn("Got %s exception while polling the consumer for checkpoints." format e) + if (attempts == 0) throw new SamzaException("Multiple attempts failed while reading the checkpoints. Giving up.", e) + attempts -= 1 + emptyEnvelopes } + } - while (offset < latestOffset) { - val request = new FetchRequestBuilder() - .addFetch(checkpointTopic, 0, offset, fetchSize) - .maxWait(500) - .minBytes(1) - .clientId(clientId) - .build - - val fetchResponse = consumer.fetch(request) - if (fetchResponse.hasError) { - warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0))) - val errorCode = fetchResponse.errorCode(checkpointTopic, 0) - if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) { - warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic)) - return - } - KafkaUtil.maybeThrowException(errorCode) + val messages: util.List[IncomingMessageEnvelope] = envelopes.get(ssp) + val messagesNum = if (messages != null) messages.size else 0 + info("CheckpointMgr read %s envelopes (%s messages) from ssp %s. Current offset is %s, newest is %s" + format (envelopes.size(), messagesNum, ssp, currentOffset, newestOffset)) + if (envelopes.isEmpty || messagesNum <= 0) { + info("Got empty/null list of messages") + } else { + msgCount += messages.size() + // check the key + for (msg: IncomingMessageEnvelope <- messages) { + val key = msg.getKey.asInstanceOf[Array[Byte]] + currentOffset = msg.getOffset().toLong + if (key == null) { + throw new KafkaUtilException("While reading checkpoint (currentOffset=%s) stream encountered message without key." + format currentOffset) } - for (response <- fetchResponse.messageSet(checkpointTopic, 0)) { - offset = response.nextOffset - startingOffset = Some(offset) // For next time we call - - if (!response.message.hasKey) { - throw new KafkaUtilException("Encountered message without key.") - } - - val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key)) + val checkpointKey = KafkaCheckpointLogKey.fromBytes(key) - if (!shouldHandleEntry(checkpointKey)) { - debug("Skipping " + entryType + " entry with key " + checkpointKey) - } else { - handleEntry(response.message.payload, checkpointKey) - } + if (!shouldHandleEntry(checkpointKey)) { + info("Skipping checkpoint log entry at offset %s with key %s." format(currentOffset, checkpointKey)) + } else { + // handleEntry requires ByteBuffer + val checkpointPayload = ByteBuffer.wrap(msg.getMessage.asInstanceOf[Array[Byte]]) + handleEntry(checkpointPayload, checkpointKey) } } - } finally { - consumer.close() - } - - loop.done - Unit - }, - - (exception, loop) => { - exception match { - case e: InvalidMessageException => throw new KafkaUtilException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e) - case e: InvalidMessageSizeException => throw new KafkaUtilException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e) - case e: UnknownTopicOrPartitionException => throw new KafkaUtilException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e) - case e: KafkaUtilException => throw e - case e: Exception => - warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e)) - debug("Exception detail:", e) } } - ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic)) - + } finally { + systemConsumer.stop() + } + info("Done reading %s messages from checkpoint system:%s topic:%s" format(msgCount, systemName, checkpointTopic)) } - def start { + override def start { kafkaUtil.createTopic(checkpointTopic, 1, replicationFactor, checkpointTopicProperties) kafkaUtil.validateTopicPartitionCount(checkpointTopic, systemName, metadataStore, 1, failOnCheckpointValidation) } - def register(taskName: TaskName) { + override def register(taskName: TaskName) { debug("Adding taskName " + taskName + " to " + this) taskNames += taskName } + def stop = { - if (producer != null) { - producer.close - } + synchronized ( + if (systemProducer != null) { + systemProducer.stop + systemProducer = null + } + ) + } - override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic) -} + override def clearCheckpoints = { + info("Clear checkpoint stream %s in system %s" format (checkpointTopic, systemName)) + val spec = StreamSpec.createCheckpointStreamSpec(checkpointTopic, systemName) + systemAdmin.clearStream(spec) + } -object KafkaCheckpointManager { - val CHECKPOINT_LOG4J_ENTRY = "checkpoint log" - val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping" + override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic) } http://git-wip-us.apache.org/repos/asf/samza/blob/8ee61a68/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index c42882e..402248f 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -22,14 +22,14 @@ package org.apache.samza.checkpoint.kafka import java.util.Properties import kafka.utils.ZkUtils -import org.apache.kafka.clients.producer.KafkaProducer import org.apache.samza.SamzaException import org.apache.samza.checkpoint.{CheckpointManager, CheckpointManagerFactory} import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.config.KafkaConfig.Config2Kafka -import org.apache.samza.config.{Config, KafkaConfig} +import org.apache.samza.config.{Config, KafkaConfig, SystemConfig} import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging} +import org.apache.samza.system.SystemFactory +import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging, Util, _} + object KafkaCheckpointManagerFactory { val INJECTED_PRODUCER_PROPERTIES = Map( @@ -46,12 +46,29 @@ object KafkaCheckpointManagerFactory { val segmentBytes: Int = if (config == null) { KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES } else { - config.getCheckpointSegmentBytes() + new KafkaConfig(config).getCheckpointSegmentBytes() } (new Properties /: Map( "cleanup.policy" -> "compact", "segment.bytes" -> String.valueOf(segmentBytes))) { case (props, (k, v)) => props.put(k, v); props } } + + /** + * Get the checkpoint system and system factory from the configuration + * @param config + * @return system name and system factory + */ + def getCheckpointSystemStreamAndFactory(config: Config) = { + + val kafkaConfig = new KafkaConfig(config) + val systemName = kafkaConfig.getCheckpointSystem.getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager.")) + + val systemFactoryClassName = new SystemConfig(config) + .getSystemFactory(systemName) + .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName)) + val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName) + (systemName, systemFactory) + } } class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging { @@ -62,19 +79,17 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs")) val jobId = config.getJobId.getOrElse("1") - val systemName = config - .getCheckpointSystem - .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager.")) + val (systemName: String, systemFactory : SystemFactory) = getCheckpointSystemStreamAndFactory(config) - val producerConfig = config.getKafkaSystemProducerConfig( + val kafkaConfig = new KafkaConfig(config) + val producerConfig = kafkaConfig.getKafkaSystemProducerConfig( systemName, clientId, INJECTED_PRODUCER_PROPERTIES) - val connectProducer = () => { - new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) - } - val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) + val noOpMetricsRegistry = new NoOpMetricsRegistry() + + val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(systemName, clientId) val zkConnect = Option(consumerConfig.zkConnect) .getOrElse(throw new SamzaException("no zookeeper.connect defined in config")) val connectZk = () => { @@ -85,14 +100,16 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin new KafkaCheckpointManager( clientId, - KafkaUtil.getCheckpointTopic(jobName, jobId), + KafkaUtil.getCheckpointTopic(jobName, jobId, config), systemName, - config.getCheckpointReplicationFactor.getOrElse("3").toInt, + kafkaConfig.getCheckpointReplicationFactor.get.toInt, socketTimeout, consumerConfig.socketReceiveBufferBytes, consumerConfig.fetchMessageMaxBytes, // must be > buffer size + () => systemFactory.getConsumer(systemName, config, noOpMetricsRegistry), + () => systemFactory.getAdmin(systemName, config), new ClientUtilTopicMetadataStore(producerConfig.bootsrapServers, clientId, socketTimeout), - connectProducer, + () => systemFactory.getProducer(systemName, config, noOpMetricsRegistry), connectZk, config.getSystemStreamPartitionGrouperFactory, // To find out the SSPGrouperFactory class so it can be included/verified in the key config.failOnCheckpointValidation, http://git-wip-us.apache.org/repos/asf/samza/blob/8ee61a68/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 9ac21ef..825b598 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -278,7 +278,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { injectedProps: Map[String, String] = Map()) = { val subConf = config.subset("systems.%s.producer." format systemName, true) - val producerProps = new util.HashMap[String, Object]() + val producerProps = new util.HashMap[String, String]() producerProps.putAll(subConf) producerProps.put("client.id", clientId) producerProps.putAll(injectedProps.asJava) @@ -288,7 +288,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { class KafkaProducerConfig(val systemName: String, val clientId: String = "", - properties: java.util.Map[String, Object] = new util.HashMap[String, Object]()) extends Logging { + properties: java.util.Map[String, String] = new util.HashMap[String, String]()) extends Logging { // Copied from new Kafka API - Workaround until KAFKA-1794 is resolved val RECONNECT_BACKOFF_MS_DEFAULT = 10L http://git-wip-us.apache.org/repos/asf/samza/blob/8ee61a68/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index a14812e..43b912d 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -19,22 +19,21 @@ package org.apache.samza.checkpoint.kafka -import kafka.admin.AdminUtils -import kafka.common.{InvalidMessageSizeException, UnknownTopicOrPartitionException} -import kafka.message.InvalidMessageException -import kafka.server.{KafkaConfig, KafkaServer, ConfigType} -import kafka.utils.{CoreUtils, TestUtils, ZkUtils} -import kafka.integration.KafkaServerTestHarness - -import org.apache.kafka.common.security.JaasUtils +import _root_.kafka.admin.AdminUtils +import _root_.kafka.common.{InvalidMessageSizeException, UnknownTopicOrPartitionException} +import _root_.kafka.integration.KafkaServerTestHarness +import _root_.kafka.message.InvalidMessageException +import _root_.kafka.server.{ConfigType, KafkaConfig} +import _root_.kafka.utils.{CoreUtils, TestUtils, ZkUtils} import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.security.JaasUtils import org.apache.samza.checkpoint.Checkpoint -import org.apache.samza.config.{JobConfig, KafkaProducerConfig, MapConfig} +import org.apache.samza.config._ import org.apache.samza.container.TaskName import org.apache.samza.container.grouper.stream.GroupByPartitionFactory import org.apache.samza.serializers.CheckpointSerde -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtilException, TopicMetadataStore} +import org.apache.samza.system._ +import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtilException, NoOpMetricsRegistry, TopicMetadataStore} import org.apache.samza.{Partition, SamzaException} import org.junit.Assert._ import org.junit._ @@ -69,23 +68,40 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName + var systemConsumerFn: ()=>SystemConsumer = ()=>{null} + var systemProducerFn: ()=>SystemProducer = ()=>{null} + var systemAdminFn: ()=>SystemAdmin = ()=>{null} + @Before override def setUp { super.setUp TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update") - val config = new java.util.HashMap[String, Object]() + val systemName = "kafka" val brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",") - + val config = new java.util.HashMap[String, String]() config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) config.put("acks", "all") config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString) config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES.asJava) - producerConfig = new KafkaProducerConfig("kafka", "i001", config) + producerConfig = new KafkaProducerConfig(systemName, "i001", config) metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name") + + config.put(SystemConfig.SYSTEM_FACTORY format systemName, "org.apache.samza.system.kafka.KafkaSystemFactory") + config.put(org.apache.samza.config.KafkaConfig.CHECKPOINT_SYSTEM, systemName); + config.put(JobConfig.JOB_NAME, "some-job-name"); + config.put(JobConfig.JOB_ID, "i001"); + config.put("systems.%s.producer.%s" format (systemName, ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), brokers) + config.put("systems.%s.consumer.zookeeper.connect" format systemName, zkConnect) + val cfg: SystemConfig = new SystemConfig(new MapConfig(config)) + val (systemStreamName: String, systemConsumerFactory : SystemFactory) = + KafkaCheckpointManagerFactory.getCheckpointSystemStreamAndFactory(cfg) + systemConsumerFn = () => {systemConsumerFactory.getConsumer(systemStreamName, cfg, new NoOpMetricsRegistry())} + systemProducerFn = () => {systemConsumerFactory.getProducer(systemStreamName, cfg, new NoOpMetricsRegistry())} + systemAdminFn = () => {systemConsumerFactory.getAdmin(systemStreamName, cfg)} } @After @@ -171,6 +187,45 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { kcm.stop } + + @Test + def testCheckpointReadTwice { + val kcm = getKafkaCheckpointManager + val taskName = new TaskName(partition.toString) + kcm.register(taskName) + createCheckpointTopic() + kcm.kafkaUtil.validateTopicPartitionCount(checkpointTopic, "kafka", metadataStore, 1) + + // check that log compaction is enabled. + val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure) + val topicConfig = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, checkpointTopic) + zkClient.close + assertEquals("compact", topicConfig.get("cleanup.policy")) + assertEquals("26214400", topicConfig.get("segment.bytes")) + + // read before topic exists should result in a null checkpoint + var readCp = kcm.readLastCheckpoint(taskName) + assertNull(readCp) + + // create topic the first time around + writeCheckpoint(taskName, cp1) + readCp = kcm.readLastCheckpoint(taskName) + assertEquals(cp1, readCp) + + // writing a second message should work, too + writeCheckpoint(taskName, cp2) + readCp = kcm.readLastCheckpoint(taskName) + assertEquals(cp2, readCp) + kcm.stop + + // get new KCM for the same stream + val kcm1 = getKafkaCheckpointManager + kcm1.register(taskName) + readCp = kcm1.readLastCheckpoint(taskName) + assertEquals(cp2, readCp) + kcm1.stop + } + @Test def testUnrecoverableKafkaErrorShouldThrowKafkaCheckpointManagerException { val exceptions = List("InvalidMessageException", "InvalidMessageSizeException", "UnknownTopicOrPartitionException") @@ -184,9 +239,10 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { // because serde will throw unrecoverable errors, it should result a KafkaCheckpointException try { kcm.readLastCheckpoint(taskName) - fail("Expected a KafkaUtilException.") + fail("Expected an Exception.") } catch { case e: KafkaUtilException => None + case e: Exception => None } kcm.stop } @@ -229,8 +285,10 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { socketTimeout = 30000, bufferSize = 64 * 1024, fetchSize = 300 * 1024, + getSystemConsumer = systemConsumerFn, + getSystemAdmin = systemAdminFn, metadataStore = metadataStore, - connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties), + getSystemProducer = systemProducerFn, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString, failOnCheckpointValidation = failOnTopicValidation, @@ -248,8 +306,10 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { socketTimeout = 30000, bufferSize = 64 * 1024, fetchSize = 300 * 1024, + getSystemConsumer = systemConsumerFn, + getSystemAdmin = systemAdminFn, metadataStore = metadataStore, - connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties), + getSystemProducer = systemProducerFn, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString, failOnCheckpointValidation = failOnTopicValidation, http://git-wip-us.apache.org/repos/asf/samza/blob/8ee61a68/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index 19f3903..214d9e6 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -69,7 +69,7 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness { override def setUp { super.setUp - val config = new java.util.HashMap[String, Object]() + val config = new java.util.HashMap[String, String]() brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",") http://git-wip-us.apache.org/repos/asf/samza/blob/8ee61a68/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala index 59e9a89..6c1b184 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala @@ -125,7 +125,7 @@ object StreamTaskTestUtil { jobConfig ++= Map("systems.kafka.consumer.zookeeper.connect" -> zkConnect, "systems.kafka.producer.bootstrap.servers" -> brokers) - val config = new util.HashMap[String, Object]() + val config = new util.HashMap[String, String]() config.put("bootstrap.servers", brokers) config.put("request.required.acks", "-1")
