http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala deleted file mode 100644 index 958d07c..0000000 --- a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package old.checkpoint - -import java.util - -import org.apache.samza.SamzaException -import org.apache.samza.container.TaskName -import org.codehaus.jackson.`type`.TypeReference -import org.codehaus.jackson.map.ObjectMapper - -import scala.collection.JavaConversions._ - -/** - * Kafka Checkpoint Log-specific key used to identify what type of entry is - * written for any particular log entry. - * - * @param map Backing map to hold key values - */ -class KafkaCheckpointLogKey private (val map: Map[String, String]) { - // This might be better as a case class... - import KafkaCheckpointLogKey._ - - /** - * Serialize this key to bytes - * @return Key as bytes - */ - def toBytes(): Array[Byte] = { - val jMap = new util.HashMap[String, String](map.size) - jMap.putAll(map) - - JSON_MAPPER.writeValueAsBytes(jMap) - } - - private def getKey = map.getOrElse(CHECKPOINT_KEY_KEY, throw new SamzaException("No " + CHECKPOINT_KEY_KEY + " in map for Kafka Checkpoint log key")) - - /** - * Is this key for a checkpoint entry? - * - * @return true iff this key's entry is for a checkpoint - */ - def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE) - - /** - * Is this key for a changelog partition mapping? - * - * @return true iff this key's entry is for a changelog partition mapping - */ - def isChangelogPartitionMapping = getKey.equals(CHANGELOG_PARTITION_KEY_TYPE) - - /** - * If this Key is for a checkpoint entry, return its associated TaskName. - * - * @return TaskName for this checkpoint or throw an exception if this key does not have a TaskName entry - */ - def getCheckpointTaskName = { - val asString = map.getOrElse(CHECKPOINT_TASKNAME_KEY, throw new SamzaException("No TaskName in checkpoint key: " + this)) - new TaskName(asString) - } - - def canEqual(other: Any): Boolean = other.isInstanceOf[KafkaCheckpointLogKey] - - override def equals(other: Any): Boolean = other match { - case that: KafkaCheckpointLogKey => - (that canEqual this) && - map == that.map - case _ => false - } - - override def hashCode(): Int = { - val state = Seq(map) - state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) - } -} - -object KafkaCheckpointLogKey { - /** - * Messages in the checkpoint log have keys associated with them. These keys are maps that describe the message's - * type, either a checkpoint or a changelog-partition-mapping. - */ - val CHECKPOINT_KEY_KEY = "type" - val CHECKPOINT_KEY_TYPE = "checkpoint" - val CHANGELOG_PARTITION_KEY_TYPE = "changelog-partition-mapping" - val CHECKPOINT_TASKNAME_KEY = "taskName" - val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = "systemstreampartition-grouper-factory" - - /** - * Partition mapping keys have no dynamic values, so we just need one instance. - */ - val CHANGELOG_PARTITION_MAPPING_KEY = new KafkaCheckpointLogKey(Map(CHECKPOINT_KEY_KEY -> CHANGELOG_PARTITION_KEY_TYPE)) - - private val JSON_MAPPER = new ObjectMapper() - val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {} - - var systemStreamPartitionGrouperFactoryString:Option[String] = None - - /** - * Set the name of the factory configured to provide the SystemStreamPartition grouping - * so it be included in the key. - * - * @param str Config value of SystemStreamPartition Grouper Factory - */ - def setSystemStreamPartitionGrouperFactoryString(str:String) = { - systemStreamPartitionGrouperFactoryString = Some(str) - } - - /** - * Get the name of the factory configured to provide the SystemStreamPartition grouping - * so it be included in the key - */ - def getSystemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString.getOrElse(throw new SamzaException("No SystemStreamPartition grouping factory string has been set.")) - - /** - * Build a key for a a checkpoint log entry for a particular TaskName - * @param taskName TaskName to build for this checkpoint entry - * - * @return Key for checkpoint log entry - */ - def getCheckpointKey(taskName:TaskName) = { - val map = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE, - CHECKPOINT_TASKNAME_KEY -> taskName.getTaskName, - SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY -> getSystemStreamPartitionGrouperFactoryString) - - new KafkaCheckpointLogKey(map) - } - - /** - * Build a key for a changelog partition mapping entry - * - * @return Key for changelog partition mapping entry - */ - def getChangelogPartitionMappingKey() = CHANGELOG_PARTITION_MAPPING_KEY - - /** - * Deserialize a Kafka checkpoint log key - * @param bytes Serialized (via JSON) Kafka checkpoint log key - * @return Checkpoint log key - */ - def fromBytes(bytes: Array[Byte]): KafkaCheckpointLogKey = { - try { - val jmap: util.HashMap[String, String] = JSON_MAPPER.readValue(bytes, KEY_TYPEREFERENCE) - - if(!jmap.containsKey(CHECKPOINT_KEY_KEY)) { - throw new SamzaException("No type entry in checkpoint key: " + jmap) - } - - // Only checkpoint keys have ssp grouper factory keys - if(jmap.get(CHECKPOINT_KEY_KEY).equals(CHECKPOINT_KEY_TYPE)) { - val sspGrouperFactory = jmap.get(SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY) - - if (sspGrouperFactory == null) { - throw new SamzaException("No SystemStreamPartition Grouper factory entry in checkpoint key: " + jmap) - } - - if (!sspGrouperFactory.equals(getSystemStreamPartitionGrouperFactoryString)) { - throw new DifferingSystemStreamPartitionGrouperFactoryValues(sspGrouperFactory, getSystemStreamPartitionGrouperFactoryString) - } - } - - new KafkaCheckpointLogKey(jmap.toMap) - } catch { - case e: Exception => - throw new SamzaException("Exception while deserializing checkpoint key", e) - } - } -} - -class DifferingSystemStreamPartitionGrouperFactoryValues(inKey:String, inConfig:String) extends SamzaException { - override def getMessage() = "Checkpoint key's SystemStreamPartition Grouper factory (" + inKey + - ") does not match value from current configuration (" + inConfig + "). " + - "This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported." -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala deleted file mode 100644 index 627631a..0000000 --- a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package old.checkpoint - -import java.nio.ByteBuffer -import java.util -import java.util.Properties - -import kafka.admin.AdminUtils -import kafka.api._ -import kafka.common.{ErrorMapping, InvalidMessageSizeException, TopicAndPartition, TopicExistsException, UnknownTopicOrPartitionException} -import kafka.consumer.SimpleConsumer -import kafka.message.InvalidMessageException -import kafka.utils.Utils -import org.I0Itec.zkclient.ZkClient -import org.apache.kafka.clients.producer.{Producer, ProducerRecord} -import org.apache.samza.SamzaException -import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} -import org.apache.samza.container.TaskName -import org.apache.samza.serializers.CheckpointSerde -import org.apache.samza.system.kafka.TopicMetadataCache -import org.apache.samza.util.{ExponentialSleepStrategy, KafkaUtil, Logging, TopicMetadataStore} - -import scala.collection.mutable - -/** - * Kafka checkpoint manager is used to store checkpoints in a Kafka topic. - * To read a checkpoint for a specific taskName, we find the newest message - * keyed to that taskName. If there is no such message, no checkpoint data - * exists. The underlying log has a single partition into which all - * checkpoints and TaskName to changelog partition mappings are written. - */ -class KafkaCheckpointManager(clientId: String, - checkpointTopic: String, - systemName: String, - socketTimeout: Int, - bufferSize: Int, - fetchSize: Int, - metadataStore: TopicMetadataStore, - connectProducer: () => Producer[Array[Byte], Array[Byte]], - connectZk: () => ZkClient, - systemStreamPartitionGrouperFactoryString: String, - retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, - serde: CheckpointSerde = new CheckpointSerde, - checkpointTopicProperties: Properties = new Properties) extends Logging { - import KafkaCheckpointManager._ - - var taskNames = Set[TaskName]() - var producer: Producer[Array[Byte], Array[Byte]] = null - var taskNamesToOffsets: Map[TaskName, Checkpoint] = null - - var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint - - KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString) - - info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName)) - - private def getConsumer(): SimpleConsumer = { - val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics)) - val metadata = metadataMap(checkpointTopic) - val partitionMetadata = metadata.partitionsMetadata - .filter(_.partitionId == 0) - .headOption - .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka.")) - val leader = partitionMetadata - .leader - .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic)) - - info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic)) - - new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId) - } - - private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1) - - private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = { - val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))) - .partitionErrorAndOffsets - .get(topicAndPartition) - .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:0" format checkpointTopic)) - // Fail or retry if there was an an issue with the offset request. - KafkaUtil.maybeThrowException(offsetResponse.error) - - val offset: Long = offsetResponse - .offsets - .headOption - .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:0" format checkpointTopic)) - - offset - } - - /** - * Read the last checkpoint for specified TaskName - * - * @param taskName Specific Samza taskName for which to get the last checkpoint of. - */ - def readLastCheckpoint(taskName: TaskName): Checkpoint = { - if (!taskNames.contains(taskName)) { - throw new SamzaException(taskName + " not registered with this CheckpointManager") - } - - info("Reading checkpoint for taskName " + taskName) - - if (taskNamesToOffsets == null) { - info("No TaskName to checkpoint mapping provided. Reading for first time.") - taskNamesToOffsets = readCheckpointsFromLog() - } else { - info("Already existing checkpoint mapping. Merging new offsets") - taskNamesToOffsets ++= readCheckpointsFromLog() - } - - val checkpoint = taskNamesToOffsets.get(taskName).getOrElse(null) - - info("Got checkpoint state for taskName %s: %s" format(taskName, checkpoint)) - - checkpoint - } - - /** - * Read through entire log, discarding changelog mapping, and building map of TaskNames to Checkpoints - */ - def readCheckpointsFromLog(): Map[TaskName, Checkpoint] = { - val checkpoints = mutable.Map[TaskName, Checkpoint]() - - def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isCheckpointKey - - def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = { - val taskName = checkpointKey.getCheckpointTaskName - val checkpoint = serde.fromBytes(Utils.readBytes(payload)) - debug("Adding checkpoint " + checkpoint + " for taskName " + taskName) - checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go - } - - readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint) - - checkpoints.toMap /* of the immutable kind */ - } - - /** - * Read through entire log, discarding checkpoints, finding latest changelogPartitionMapping - * - * Lots of duplicated code from the checkpoint method, but will be better to refactor this code into AM-based - * checkpoint log reading - */ - def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = { - var changelogPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]() - - def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isChangelogPartitionMapping - - def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = { - changelogPartitionMapping = serde.changelogPartitionMappingFromBytes(Utils.readBytes(payload)) - - debug("Adding changelog partition mapping" + changelogPartitionMapping) - } - - readLog(CHANGELOG_PARTITION_MAPPING_LOG4j, shouldHandleEntry, handleCheckpoint) - - changelogPartitionMapping - } - - /** - * Common code for reading both changelog partition mapping and change log - * - * @param entryType What type of entry to look for within the log key's - * @param handleEntry Code to handle an entry in the log once it's found - */ - private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean, - handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = { - retryBackoff.run[Unit]( - loop => { - val consumer = getConsumer() - - val topicAndPartition = new TopicAndPartition(checkpointTopic, 0) - - try { - var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition)) - - info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType)) - - val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime) - - info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic)) - - if (offset < 0) { - info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic)) - return - } - - while (offset < latestOffset) { - val request = new FetchRequestBuilder() - .addFetch(checkpointTopic, 0, offset, fetchSize) - .maxWait(500) - .minBytes(1) - .clientId(clientId) - .build - - val fetchResponse = consumer.fetch(request) - if (fetchResponse.hasError) { - warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0))) - val errorCode = fetchResponse.errorCode(checkpointTopic, 0) - if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) { - warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic)) - return - } - KafkaUtil.maybeThrowException(errorCode) - } - - for (response <- fetchResponse.messageSet(checkpointTopic, 0)) { - offset = response.nextOffset - startingOffset = Some(offset) // For next time we call - - if (!response.message.hasKey) { - throw new KafkaCheckpointException("Encountered message without key.") - } - - val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key)) - - if (!shouldHandleEntry(checkpointKey)) { - debug("Skipping " + entryType + " entry with key " + checkpointKey) - } else { - handleEntry(response.message.payload, checkpointKey) - } - } - } - } finally { - consumer.close() - } - - loop.done - Unit - }, - - (exception, loop) => { - exception match { - case e: InvalidMessageException => throw new KafkaCheckpointException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e) - case e: InvalidMessageSizeException => throw new KafkaCheckpointException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e) - case e: UnknownTopicOrPartitionException => throw new KafkaCheckpointException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e) - case e: KafkaCheckpointException => throw e - case e: Exception => - warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e)) - debug("Exception detail:", e) - } - } - ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic)) - - } - - def topicExists: Boolean = { - val zkClient = connectZk() - try { - AdminUtils.topicExists(zkClient, checkpointTopic) - } finally { - zkClient.close() - } - } - - def start { - if (topicExists) { - validateTopic - } else { - throw new SamzaException("Failed to start KafkaCheckpointManager for non-existing checkpoint topic. KafkaCheckpointManager should only be used for migration purpose.") - } - } - - def register(taskName: TaskName) { - debug("Adding taskName " + taskName + " to " + this) - taskNames += taskName - } - - def stop = { - if (producer != null) { - producer.close - } - } - - def validateTopic = { - info("Validating checkpoint topic %s." format checkpointTopic) - retryBackoff.run( - loop => { - val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, metadataStore.getTopicInfo) - val topicMetadata = topicMetadataMap(checkpointTopic) - KafkaUtil.maybeThrowException(topicMetadata.errorCode) - - val partitionCount = topicMetadata.partitionsMetadata.length - if (partitionCount != 1) { - throw new KafkaCheckpointException("Checkpoint topic validation failed for topic %s because partition count %s did not match expected partition count of 1." format(checkpointTopic, topicMetadata.partitionsMetadata.length)) - } - - info("Successfully validated checkpoint topic %s." format checkpointTopic) - loop.done - }, - - (exception, loop) => { - exception match { - case e: KafkaCheckpointException => throw e - case e: Exception => - warn("While trying to validate topic %s: %s. Retrying." format(checkpointTopic, e)) - debug("Exception detail:", e) - } - } - ) - } - - override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic) -} - -object KafkaCheckpointManager { - val CHECKPOINT_LOG4J_ENTRY = "checkpoint log" - val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping" -} - -/** - * KafkaCheckpointManager handles retries, so we need two kinds of exceptions: - * one to signal a hard failure, and the other to retry. The - * KafkaCheckpointException is thrown to indicate a hard failure that the Kafka - * CheckpointManager can't recover from. - */ -class KafkaCheckpointException(s: String, t: Throwable) extends SamzaException(s, t) { - def this(s: String) = this(s, null) -} http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala deleted file mode 100644 index 189752a..0000000 --- a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package old.checkpoint - -import java.util.Properties - -import kafka.utils.ZKStringSerializer -import org.I0Itec.zkclient.ZkClient -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.samza.SamzaException -import org.apache.samza.checkpoint.CheckpointManager -import org.apache.samza.config.Config -import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.config.KafkaConfig.Config2Kafka -import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging} - -object KafkaCheckpointManagerFactory { - /** - * Version number to track the format of the checkpoint log - */ - val CHECKPOINT_LOG_VERSION_NUMBER = 1 - - val INJECTED_PRODUCER_PROPERTIES = Map( - "acks" -> "all", - // Forcibly disable compression because Kafka doesn't support compression - // on log compacted topics. Details in SAMZA-586. - "compression.type" -> "none") - - // Set the checkpoint topic configs to have a very small segment size and - // enable log compaction. This keeps job startup time small since there - // are fewer useless (overwritten) messages to read from the checkpoint - // topic. - def getCheckpointTopicProperties(config: Config) = { - val segmentBytes = "26214400" - - (new Properties /: Map( - "cleanup.policy" -> "compact", - "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props } - } -} - -class KafkaCheckpointManagerFactory extends Logging { - import KafkaCheckpointManagerFactory._ - - def getCheckpointManager(config: Config, registry: MetricsRegistry): KafkaCheckpointManager = { - val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config) - val systemName = Option(config.get("task.checkpoint.system")).getOrElse(throw new SamzaException("no system defined for checkpoint manager, cannot perform migration.")) - val producerConfig = config.getKafkaSystemProducerConfig( - systemName, - clientId, - INJECTED_PRODUCER_PROPERTIES) - val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) - val socketTimeout = consumerConfig.socketTimeoutMs - val bufferSize = consumerConfig.socketReceiveBufferBytes - val fetchSize = consumerConfig.fetchMessageMaxBytes // must be > buffer size - - val connectProducer = () => { - new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) - } - val zkConnect = Option(consumerConfig.zkConnect) - .getOrElse(throw new SamzaException("no zookeeper.connect defined in config")) - val connectZk = () => { - new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer) - } - val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs")) - val jobId = config.getJobId.getOrElse("1") - val bootstrapServers = producerConfig.bootsrapServers - val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, socketTimeout) - val checkpointTopic = getTopic(jobName, jobId) - - // Find out the SSPGrouperFactory class so it can be included/verified in the key - val systemStreamPartitionGrouperFactoryString = config.getSystemStreamPartitionGrouperFactory - - new KafkaCheckpointManager( - clientId, - checkpointTopic, - systemName, - socketTimeout, - bufferSize, - fetchSize, - metadataStore, - connectProducer, - connectZk, - systemStreamPartitionGrouperFactoryString, - checkpointTopicProperties = getCheckpointTopicProperties(config)) - } - - private def getTopic(jobName: String, jobId: String) = - "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-")) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala b/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala deleted file mode 100644 index 32afe4c..0000000 --- a/samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package old.checkpoint - -import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} -import org.apache.samza.config.Config -import org.apache.samza.container.TaskName -import org.apache.samza.coordinator.JobCoordinator -import org.apache.samza.coordinator.stream.messages.{CoordinatorStreamMessage, SetMigrationMetaMessage} -import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamSystemFactory} -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.migration.MigrationPlan -import org.apache.samza.storage.ChangelogPartitionManager -import org.apache.samza.util.{Logging, NoOpMetricsRegistry} -import scala.collection.JavaConverters._ - -class KafkaCheckpointMigration extends MigrationPlan with Logging { - val source = "CHECKPOINTMIGRATION" - val migrationKey = "CheckpointMigration09to10" - val migrationVal = "true" - - def migrate(config: Config, getManager:() => KafkaCheckpointManager): Unit = { - val coordinatorFactory = new CoordinatorStreamSystemFactory - val coordinatorSystemProducer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap) - var manager = getManager() - // make sure to validate that we only perform migration when the checkpoint topic exists - if (manager.topicExists) { - manager.validateTopic - val checkpointMap = manager.readCheckpointsFromLog() - manager.stop - - manager = getManager() - val changelogMap = manager.readChangeLogPartitionMapping() - manager.stop - - val coordinatorSystemConsumer = coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap) - if (migrationVerification(coordinatorSystemConsumer)) { - info("Migration %s was already performed, doing nothing" format migrationKey) - return - } - info("No previous migration for %s were detected, performing migration" format migrationKey) - val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, source) - checkpointManager.start() - checkpointMap.foreach { case (taskName: TaskName, checkpoint: Checkpoint) => checkpointManager.writeCheckpoint(taskName, checkpoint) } - val changelogPartitionManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, source) - changelogPartitionManager.start() - changelogPartitionManager.writeChangeLogPartitionMapping(changelogMap) - changelogPartitionManager.stop() - checkpointManager.stop() - } - migrationCompletionMark(coordinatorSystemProducer) - } - - override def migrate(config: Config) { - val factory = new KafkaCheckpointManagerFactory - def getManager() = factory.getCheckpointManager(config, new NoOpMetricsRegistry) - migrate(config, getManager) - } - - def migrationVerification(coordinatorSystemConsumer : CoordinatorStreamSystemConsumer): Boolean = { - coordinatorSystemConsumer.register() - coordinatorSystemConsumer.start() - coordinatorSystemConsumer.bootstrap() - val stream = coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE) - val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal) - stream.contains(message.asInstanceOf[CoordinatorStreamMessage]) - } - - def migrationCompletionMark(coordinatorSystemProducer: CoordinatorStreamSystemProducer) = { - info("Marking completion of migration %s" format migrationKey) - val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal) - coordinatorSystemProducer.start() - coordinatorSystemProducer.send(message) - coordinatorSystemProducer.stop() - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala new file mode 100644 index 0000000..ea8462d --- /dev/null +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.checkpoint.kafka + +import java.util + +import org.apache.samza.SamzaException +import org.apache.samza.container.TaskName +import org.codehaus.jackson.`type`.TypeReference +import org.codehaus.jackson.map.ObjectMapper + +import scala.collection.JavaConversions._ + +/** + * Kafka Checkpoint Log-specific key used to identify what type of entry is + * written for any particular log entry. + * + * @param map Backing map to hold key values + */ +class KafkaCheckpointLogKey private (val map: Map[String, String]) { + // This might be better as a case class... + import org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey._ + + /** + * Serialize this key to bytes + * @return Key as bytes + */ + def toBytes(): Array[Byte] = { + val jMap = new util.HashMap[String, String](map.size) + jMap.putAll(map) + + JSON_MAPPER.writeValueAsBytes(jMap) + } + + private def getKey = map.getOrElse(CHECKPOINT_KEY_KEY, throw new SamzaException("No " + CHECKPOINT_KEY_KEY + " in map for Kafka Checkpoint log key")) + + /** + * Is this key for a checkpoint entry? + * + * @return true iff this key's entry is for a checkpoint + */ + def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE) + + /** + * Is this key for a changelog partition mapping? + * + * @return true iff this key's entry is for a changelog partition mapping + */ + @Deprecated + def isChangelogPartitionMapping = getKey.equals(CHANGELOG_PARTITION_KEY_TYPE) + + /** + * If this Key is for a checkpoint entry, return its associated TaskName. + * + * @return TaskName for this checkpoint or throw an exception if this key does not have a TaskName entry + */ + def getCheckpointTaskName = { + val asString = map.getOrElse(CHECKPOINT_TASKNAME_KEY, throw new SamzaException("No TaskName in checkpoint key: " + this)) + new TaskName(asString) + } + + def canEqual(other: Any): Boolean = other.isInstanceOf[KafkaCheckpointLogKey] + + override def equals(other: Any): Boolean = other match { + case that: KafkaCheckpointLogKey => + (that canEqual this) && + map == that.map + case _ => false + } + + override def hashCode(): Int = { + val state = Seq(map) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + } +} + +object KafkaCheckpointLogKey { + /** + * Messages in the checkpoint log have keys associated with them. These keys are maps that describe the message's + * type, either a checkpoint or a changelog-partition-mapping. + */ + val CHECKPOINT_KEY_KEY = "type" + val CHECKPOINT_KEY_TYPE = "checkpoint" + + @Deprecated + val CHANGELOG_PARTITION_KEY_TYPE = "changelog-partition-mapping" + + val CHECKPOINT_TASKNAME_KEY = "taskName" + val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = "systemstreampartition-grouper-factory" + + /** + * Partition mapping keys have no dynamic values, so we just need one instance. + */ + @Deprecated + val CHANGELOG_PARTITION_MAPPING_KEY = new KafkaCheckpointLogKey(Map(CHECKPOINT_KEY_KEY -> CHANGELOG_PARTITION_KEY_TYPE)) + + private val JSON_MAPPER = new ObjectMapper() + val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {} + + var systemStreamPartitionGrouperFactoryString:Option[String] = None + + /** + * Set the name of the factory configured to provide the SystemStreamPartition grouping + * so it be included in the key. + * + * @param str Config value of SystemStreamPartition Grouper Factory + */ + def setSystemStreamPartitionGrouperFactoryString(str:String) = { + systemStreamPartitionGrouperFactoryString = Some(str) + } + + /** + * Get the name of the factory configured to provide the SystemStreamPartition grouping + * so it be included in the key + */ + def getSystemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString.getOrElse(throw new SamzaException("No SystemStreamPartition grouping factory string has been set.")) + + /** + * Build a key for a a checkpoint log entry for a particular TaskName + * @param taskName TaskName to build for this checkpoint entry + * + * @return Key for checkpoint log entry + */ + def getCheckpointKey(taskName:TaskName) = { + val map = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE, + CHECKPOINT_TASKNAME_KEY -> taskName.getTaskName, + SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY -> getSystemStreamPartitionGrouperFactoryString) + + new KafkaCheckpointLogKey(map) + } + + /** + * Build a key for a changelog partition mapping entry + * + * @return Key for changelog partition mapping entry + */ + @Deprecated + def getChangelogPartitionMappingKey() = CHANGELOG_PARTITION_MAPPING_KEY + + /** + * Deserialize a Kafka checkpoint log key + * @param bytes Serialized (via JSON) Kafka checkpoint log key + * @return Checkpoint log key + */ + def fromBytes(bytes: Array[Byte]): KafkaCheckpointLogKey = { + try { + val jmap: util.HashMap[String, String] = JSON_MAPPER.readValue(bytes, KEY_TYPEREFERENCE) + + if(!jmap.containsKey(CHECKPOINT_KEY_KEY)) { + throw new SamzaException("No type entry in checkpoint key: " + jmap) + } + + // Only checkpoint keys have ssp grouper factory keys + if(jmap.get(CHECKPOINT_KEY_KEY).equals(CHECKPOINT_KEY_TYPE)) { + val sspGrouperFactory = jmap.get(SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY) + + if (sspGrouperFactory == null) { + throw new SamzaException("No SystemStreamPartition Grouper factory entry in checkpoint key: " + jmap) + } + + if (!sspGrouperFactory.equals(getSystemStreamPartitionGrouperFactoryString)) { + throw new DifferingSystemStreamPartitionGrouperFactoryValues(sspGrouperFactory, getSystemStreamPartitionGrouperFactoryString) + } + } + + new KafkaCheckpointLogKey(jmap.toMap) + } catch { + case e: Exception => + throw new SamzaException("Exception while deserializing checkpoint key", e) + } + } +} + +class DifferingSystemStreamPartitionGrouperFactoryValues(inKey:String, inConfig:String) extends SamzaException { + override def getMessage() = "Checkpoint key's SystemStreamPartition Grouper factory (" + inKey + + ") does not match value from current configuration (" + inConfig + "). " + + "This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported." +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala new file mode 100644 index 0000000..787de1f --- /dev/null +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.checkpoint.kafka + +import java.nio.ByteBuffer +import java.util +import java.util.Properties + +import kafka.api._ +import kafka.common.{ErrorMapping, InvalidMessageSizeException, TopicAndPartition, UnknownTopicOrPartitionException} +import kafka.consumer.SimpleConsumer +import kafka.message.InvalidMessageException +import kafka.utils.Utils +import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.clients.producer.{Producer, ProducerRecord} +import org.apache.samza.SamzaException +import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} +import org.apache.samza.container.TaskName +import org.apache.samza.serializers.CheckpointSerde +import org.apache.samza.system.kafka.TopicMetadataCache +import org.apache.samza.util._ + +import scala.collection.mutable + +/** + * Kafka checkpoint manager is used to store checkpoints in a Kafka topic. + * To read a checkpoint for a specific taskName, we find the newest message + * keyed to that taskName. If there is no such message, no checkpoint data + * exists. The underlying log has a single partition into which all + * checkpoints and TaskName to changelog partition mappings are written. + */ +class KafkaCheckpointManager( + clientId: String, + checkpointTopic: String, + val systemName: String, + replicationFactor: Int, + socketTimeout: Int, + bufferSize: Int, + fetchSize: Int, + val metadataStore: TopicMetadataStore, + connectProducer: () => Producer[Array[Byte], Array[Byte]], + val connectZk: () => ZkClient, + systemStreamPartitionGrouperFactoryString: String, + val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, + serde: CheckpointSerde = new CheckpointSerde, + checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging { + import org.apache.samza.checkpoint.kafka.KafkaCheckpointManager._ + + var taskNames = Set[TaskName]() + var producer: Producer[Array[Byte], Array[Byte]] = null + var taskNamesToOffsets: Map[TaskName, Checkpoint] = null + + var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint + val kafkaUtil: KafkaUtil = new KafkaUtil(retryBackoff, connectZk) + + KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString) + + info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName)) + + /** + * Write Checkpoint for specified taskName to log + * + * @param taskName Specific Samza taskName of which to write a checkpoint of. + * @param checkpoint Reference to a Checkpoint object to store offset data in. + **/ + override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { + val key = KafkaCheckpointLogKey.getCheckpointKey(taskName) + val keyBytes = key.toBytes() + val msgBytes = serde.toBytes(checkpoint) + retryBackoff.run( + loop => { + if (producer == null) { + producer = connectProducer() + } + + producer.send(new ProducerRecord(checkpointTopic, 0, keyBytes, msgBytes)).get() + loop.done + }, + + (exception, loop) => { + warn("Failed to write %s partition entry %s: %s. Retrying." format(CHECKPOINT_LOG4J_ENTRY, key, exception)) + debug("Exception detail:", exception) + if (producer != null) { + producer.close + } + producer = null + } + ) + } + + private def getConsumer(): SimpleConsumer = { + val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics)) + val metadata = metadataMap(checkpointTopic) + val partitionMetadata = metadata.partitionsMetadata + .filter(_.partitionId == 0) + .headOption + .getOrElse(throw new KafkaUtilException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka.")) + val leader = partitionMetadata + .leader + .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic)) + + info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic)) + + new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId) + } + + private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1) + + private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = { + val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))) + .partitionErrorAndOffsets + .get(topicAndPartition) + .getOrElse(throw new KafkaUtilException("Unable to find offset information for %s:0" format checkpointTopic)) + // Fail or retry if there was an an issue with the offset request. + KafkaUtil.maybeThrowException(offsetResponse.error) + + val offset: Long = offsetResponse + .offsets + .headOption + .getOrElse(throw new KafkaUtilException("Got response, but no offsets defined for %s:0" format checkpointTopic)) + + offset + } + + /** + * Read the last checkpoint for specified TaskName + * + * @param taskName Specific Samza taskName for which to get the last checkpoint of. + **/ + override def readLastCheckpoint(taskName: TaskName): Checkpoint = { + if (!taskNames.contains(taskName)) { + throw new SamzaException(taskName + " not registered with this CheckpointManager") + } + + info("Reading checkpoint for taskName " + taskName) + + if (taskNamesToOffsets == null) { + info("No TaskName to checkpoint mapping provided. Reading for first time.") + taskNamesToOffsets = readCheckpointsFromLog() + } else { + info("Already existing checkpoint mapping. Merging new offsets") + taskNamesToOffsets ++= readCheckpointsFromLog() + } + + val checkpoint = taskNamesToOffsets.get(taskName).getOrElse(null) + + info("Got checkpoint state for taskName %s: %s" format(taskName, checkpoint)) + + checkpoint + } + + /** + * Read through entire log, discarding changelog mapping, and building map of TaskNames to Checkpoints + */ + def readCheckpointsFromLog(): Map[TaskName, Checkpoint] = { + val checkpoints = mutable.Map[TaskName, Checkpoint]() + + def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isCheckpointKey + + def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = { + val taskName = checkpointKey.getCheckpointTaskName + val checkpoint = serde.fromBytes(Utils.readBytes(payload)) + debug("Adding checkpoint " + checkpoint + " for taskName " + taskName) + checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go + } + + readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint) + checkpoints.toMap /* of the immutable kind */ + } + + + /** + * Common code for reading both changelog partition mapping and change log + * + * @param entryType What type of entry to look for within the log key's + * @param handleEntry Code to handle an entry in the log once it's found + */ + private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean, + handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = { + retryBackoff.run[Unit]( + loop => { + val consumer = getConsumer() + + val topicAndPartition = new TopicAndPartition(checkpointTopic, 0) + + try { + var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition)) + + info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType)) + + val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime) + + info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic)) + + if (offset < 0) { + info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic)) + return + } + + while (offset < latestOffset) { + val request = new FetchRequestBuilder() + .addFetch(checkpointTopic, 0, offset, fetchSize) + .maxWait(500) + .minBytes(1) + .clientId(clientId) + .build + + val fetchResponse = consumer.fetch(request) + if (fetchResponse.hasError) { + warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0))) + val errorCode = fetchResponse.errorCode(checkpointTopic, 0) + if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) { + warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic)) + return + } + KafkaUtil.maybeThrowException(errorCode) + } + + for (response <- fetchResponse.messageSet(checkpointTopic, 0)) { + offset = response.nextOffset + startingOffset = Some(offset) // For next time we call + + if (!response.message.hasKey) { + throw new KafkaUtilException("Encountered message without key.") + } + + val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key)) + + if (!shouldHandleEntry(checkpointKey)) { + debug("Skipping " + entryType + " entry with key " + checkpointKey) + } else { + handleEntry(response.message.payload, checkpointKey) + } + } + } + } finally { + consumer.close() + } + + loop.done + Unit + }, + + (exception, loop) => { + exception match { + case e: InvalidMessageException => throw new KafkaUtilException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e) + case e: InvalidMessageSizeException => throw new KafkaUtilException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e) + case e: UnknownTopicOrPartitionException => throw new KafkaUtilException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e) + case e: KafkaUtilException => throw e + case e: Exception => + warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e)) + debug("Exception detail:", e) + } + } + ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic)) + + } + + def start { + kafkaUtil.createTopic(checkpointTopic, 1, replicationFactor, checkpointTopicProperties) + kafkaUtil.validateTopicPartitionCount(checkpointTopic, systemName, metadataStore, 1) + } + + def register(taskName: TaskName) { + debug("Adding taskName " + taskName + " to " + this) + taskNames += taskName + } + + def stop = { + if (producer != null) { + producer.close + } + } + + + /** + * Read through entire log, discarding checkpoints, finding latest changelogPartitionMapping + * To be used for Migration purpose only. In newer version, changelogPartitionMapping will be handled through coordinator stream + */ + @Deprecated + def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = { + var changelogPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]() + + def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isChangelogPartitionMapping + + def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = { + changelogPartitionMapping = serde.changelogPartitionMappingFromBytes(Utils.readBytes(payload)) + + debug("Adding changelog partition mapping" + changelogPartitionMapping) + } + + readLog(CHANGELOG_PARTITION_MAPPING_LOG4j, shouldHandleEntry, handleCheckpoint) + + changelogPartitionMapping + } + + override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic) +} + +object KafkaCheckpointManager { + val CHECKPOINT_LOG4J_ENTRY = "checkpoint log" + val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping" +} http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala new file mode 100644 index 0000000..7db8940 --- /dev/null +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.checkpoint.kafka + +import java.util.Properties + +import kafka.utils.ZKStringSerializer +import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.samza.SamzaException +import org.apache.samza.checkpoint.{CheckpointManager, CheckpointManagerFactory} +import org.apache.samza.config.JobConfig.Config2Job +import org.apache.samza.config.KafkaConfig.Config2Kafka +import org.apache.samza.config.{Config, KafkaConfig} +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging} + +object KafkaCheckpointManagerFactory { + val INJECTED_PRODUCER_PROPERTIES = Map( + "acks" -> "all", + // Forcibly disable compression because Kafka doesn't support compression + // on log compacted topics. Details in SAMZA-586. + "compression.type" -> "none") + + // Set the checkpoint topic configs to have a very small segment size and + // enable log compaction. This keeps job startup time small since there + // are fewer useless (overwritten) messages to read from the checkpoint + // topic. + def getCheckpointTopicProperties(config: Config) = { + val segmentBytes: Int = if (config == null) { + KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES + } else { + config.getCheckpointSegmentBytes() + } + (new Properties /: Map( + "cleanup.policy" -> "compact", + "segment.bytes" -> String.valueOf(segmentBytes))) { case (props, (k, v)) => props.put(k, v); props } + } +} + +class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging { + import org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory._ + + def getCheckpointManager(config: Config, registry: MetricsRegistry): CheckpointManager = { + val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config) + val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs")) + val jobId = config.getJobId.getOrElse("1") + + val systemName = config + .getCheckpointSystem + .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager.")) + + val producerConfig = config.getKafkaSystemProducerConfig( + systemName, + clientId, + INJECTED_PRODUCER_PROPERTIES) + val connectProducer = () => { + new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) + } + + val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) + val zkConnect = Option(consumerConfig.zkConnect) + .getOrElse(throw new SamzaException("no zookeeper.connect defined in config")) + val connectZk = () => { + new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer) + } + val socketTimeout = consumerConfig.socketTimeoutMs + + + new KafkaCheckpointManager( + clientId, + KafkaUtil.getCheckpointTopic(jobName, jobId), + systemName, + config.getCheckpointReplicationFactor.getOrElse("3").toInt, + socketTimeout, + consumerConfig.socketReceiveBufferBytes, + consumerConfig.fetchMessageMaxBytes, // must be > buffer size + new ClientUtilTopicMetadataStore(producerConfig.bootsrapServers, clientId, socketTimeout), + connectProducer, + connectZk, + config.getSystemStreamPartitionGrouperFactory, // To find out the SSPGrouperFactory class so it can be included/verified in the key + checkpointTopicProperties = getCheckpointTopicProperties(config)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 798033c..a65e8e8 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -40,6 +40,10 @@ object KafkaConfig { val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system" val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config" + val CHECKPOINT_SYSTEM = "task.checkpoint.system" + val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor" + val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes" + val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor" val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka." // The default segment size to use for changelog topics @@ -54,10 +58,16 @@ object KafkaConfig { */ val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold" + val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400 + implicit def Config2Kafka(config: Config) = new KafkaConfig(config) } class KafkaConfig(config: Config) extends ScalaMapConfig(config) { + // checkpoints + def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM) + def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR) + def getCheckpointSegmentBytes() = getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES) // custom consumer config def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name) http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala b/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala new file mode 100644 index 0000000..c6b1fe4 --- /dev/null +++ b/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.migration + +import kafka.utils.ZKStringSerializer +import org.I0Itec.zkclient.ZkClient +import org.apache.samza.SamzaException +import org.apache.samza.checkpoint.kafka.{KafkaCheckpointManager, KafkaCheckpointManagerFactory} +import org.apache.samza.config.Config +import org.apache.samza.config.JobConfig.Config2Job +import org.apache.samza.config.KafkaConfig.Config2Kafka +import org.apache.samza.coordinator.stream.messages.{CoordinatorStreamMessage, SetMigrationMetaMessage} +import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemFactory, CoordinatorStreamSystemProducer} +import org.apache.samza.metrics.MetricsRegistryMap +import org.apache.samza.storage.ChangelogPartitionManager +import org.apache.samza.util._ + +/** + * Migrates changelog partition mapping from checkpoint topic to coordinator stream + */ +class KafkaCheckpointMigration extends MigrationPlan with Logging { + val source = "CHECKPOINTMIGRATION" + val migrationKey = "CheckpointMigration09to10" + val migrationVal = "true" + + var connectZk: () => ZkClient = null + + private def getCheckpointSystemName(config: Config): String = { + config + .getCheckpointSystem + .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager.")) + } + + private def getClientId(config: Config): String = { + KafkaUtil.getClientId("samza-checkpoint-manager", config) + } + + private def getTopicMetadataStore(config: Config): TopicMetadataStore = { + val clientId = getClientId(config) + val systemName = getCheckpointSystemName(config) + + val producerConfig = config.getKafkaSystemProducerConfig( + systemName, + clientId, + KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES) + + val consumerConfig = config.getKafkaSystemConsumerConfig( + systemName, + clientId) + + new ClientUtilTopicMetadataStore(producerConfig.bootsrapServers, clientId, consumerConfig.socketTimeoutMs) + } + + private def getConnectZk(config: Config): () => ZkClient = { + val clientId = getClientId(config) + + val checkpointSystemName = getCheckpointSystemName(config) + + val consumerConfig = config.getKafkaSystemConsumerConfig( + checkpointSystemName, + clientId) + + val zkConnectString = Option(consumerConfig.zkConnect) + .getOrElse(throw new SamzaException("no zookeeper.connect defined in config")) + () => { + new ZkClient(zkConnectString, 6000, 6000, ZKStringSerializer) + } + } + + override def migrate(config: Config) { + val jobName = config.getName.getOrElse(throw new SamzaException("Cannot find job name. Cannot proceed with migration.")) + val jobId = config.getJobId.getOrElse("1") + + val checkpointTopicName = KafkaUtil.getCheckpointTopic(jobName, jobId) + + val coordinatorSystemFactory = new CoordinatorStreamSystemFactory + val coordinatorSystemConsumer = coordinatorSystemFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap) + val coordinatorSystemProducer = coordinatorSystemFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap) + + val checkpointManager = new KafkaCheckpointManagerFactory().getCheckpointManager(config, new NoOpMetricsRegistry).asInstanceOf[KafkaCheckpointManager] + + val kafkaUtil = new KafkaUtil(new ExponentialSleepStrategy, getConnectZk(config)) + + // make sure to validate that we only perform migration when checkpoint topic exists + if (kafkaUtil.topicExists(checkpointTopicName)) { + kafkaUtil.validateTopicPartitionCount( + checkpointTopicName, + getCheckpointSystemName(config), + getTopicMetadataStore(config), + 1) + + if (migrationVerification(coordinatorSystemConsumer)) { + info("Migration %s was already performed, doing nothing" format migrationKey) + return + } + + info("No previous migration for %s were detected, performing migration" format migrationKey) + + info("Loading changelog partition mapping from checkpoint topic - %s" format checkpointTopicName) + val changelogMap = checkpointManager.readChangeLogPartitionMapping() + checkpointManager.stop + + info("Writing changelog to coordinator stream topic - %s" format Util.getCoordinatorStreamName(jobName, jobId)) + val changelogPartitionManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, source) + changelogPartitionManager.start() + changelogPartitionManager.writeChangeLogPartitionMapping(changelogMap) + changelogPartitionManager.stop() + } + migrationCompletionMark(coordinatorSystemProducer) + + } + + def migrationVerification(coordinatorSystemConsumer : CoordinatorStreamSystemConsumer): Boolean = { + coordinatorSystemConsumer.register() + coordinatorSystemConsumer.start() + coordinatorSystemConsumer.bootstrap() + val stream = coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE) + val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal) + stream.contains(message.asInstanceOf[CoordinatorStreamMessage]) + } + + def migrationCompletionMark(coordinatorSystemProducer: CoordinatorStreamSystemProducer) = { + info("Marking completion of migration %s" format migrationKey) + val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal) + coordinatorSystemProducer.start() + coordinatorSystemProducer.send(message) + coordinatorSystemProducer.stop() + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala index a7a095b..f4311d1 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala @@ -19,16 +19,24 @@ package org.apache.samza.util +import java.util.Properties import java.util.concurrent.atomic.AtomicLong +import kafka.admin.AdminUtils +import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.clients.producer.{Producer, ProducerRecord} import org.apache.kafka.common.PartitionInfo import org.apache.samza.config.Config import org.apache.samza.config.ConfigException import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.system.OutgoingMessageEnvelope -import kafka.common.ErrorMapping -import kafka.common.ReplicaNotAvailableException +import kafka.common.{TopicExistsException, ErrorMapping, ReplicaNotAvailableException} +import org.apache.samza.system.kafka.TopicMetadataCache object KafkaUtil extends Logging { + /** + * Version number to track the format of the checkpoint log + */ + val CHECKPOINT_LOG_VERSION_NUMBER = 1 val counter = new AtomicLong(0) def getClientId(id: String, config: Config): String = getClientId( @@ -51,6 +59,9 @@ object KafkaUtil extends Logging { abs(envelope.getPartitionKey.hashCode()) % numPartitions } + def getCheckpointTopic(jobName: String, jobId: String) = + "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-")) + /** * Exactly the same as Kafka's ErrorMapping.maybeThrowException * implementation, except suppresses ReplicaNotAvailableException exceptions. @@ -75,3 +86,102 @@ object KafkaUtil extends Logging { code != ErrorMapping.NoError && code != ErrorMapping.ReplicaNotAvailableCode } } + +class KafkaUtil(val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, + val connectZk: () => ZkClient) extends Logging { + /** + * Common code for creating a topic in Kafka + * + * @param topicName Name of the topic to be created + * @param partitionCount Number of partitions in the topic + * @param replicationFactor Number of replicas for the topic + * @param topicProperties Any topic related properties + */ + def createTopic(topicName: String, partitionCount: Int, replicationFactor: Int, topicProperties: Properties = new Properties) { + info("Attempting to create topic %s." format topicName) + retryBackoff.run( + loop => { + val zkClient = connectZk() + try { + AdminUtils.createTopic( + zkClient, + topicName, + partitionCount, + replicationFactor, + topicProperties) + } finally { + zkClient.close + } + + info("Created topic %s." format topicName) + loop.done + }, + + (exception, loop) => { + exception match { + case tee: TopicExistsException => + info("Topic %s already exists." format topicName) + loop.done + case e: Exception => + warn("Failed to create topic %s: %s. Retrying." format(topicName, e)) + debug("Exception detail:", e) + } + } + ) + } + + /** + * Common code to validate partition count in a topic + * + * @param topicName Name of the topic to be validated + * @param systemName Kafka system to use + * @param metadataStore Topic Metadata store + * @param expectedPartitionCount Expected number of partitions + */ + def validateTopicPartitionCount(topicName: String, + systemName: String, + metadataStore: TopicMetadataStore, + expectedPartitionCount: Int) { + info("Validating topic %s. Expecting partition count: %d" format (topicName, expectedPartitionCount)) + retryBackoff.run( + loop => { + val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo) + val topicMetadata = topicMetadataMap(topicName) + KafkaUtil.maybeThrowException(topicMetadata.errorCode) + + val partitionCount = topicMetadata.partitionsMetadata.length + if (partitionCount != expectedPartitionCount) { + throw new KafkaUtilException("Validation failed for topic %s because partition count %s did not " + + "match expected partition count of %d." format(topicName, partitionCount, expectedPartitionCount)) + } + + info("Successfully validated topic %s." format topicName) + loop.done + }, + + (exception, loop) => { + exception match { + case e: KafkaUtilException => throw e + case e: Exception => + warn("While trying to validate topic %s: %s. Retrying." format(topicName, e)) + debug("Exception detail:", e) + } + } + ) + } + + /** + * Code to verify that a topic exists + * + * @param topicName Name of the topic + * @return If exists, it returns true. Otherwise, false. + */ + def topicExists(topicName: String): Boolean = { + val zkClient = connectZk() + try { + AdminUtils.topicExists(zkClient, topicName) + } finally { + zkClient.close() + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/eba9b28f/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtilException.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtilException.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtilException.scala new file mode 100644 index 0000000..b9c5dd7 --- /dev/null +++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtilException.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.util + +import org.apache.samza.SamzaException + +/** + * KafkaCheckpointManager handles retries, so we need two kinds of exceptions: + * one to signal a hard failure, and the other to retry. The + * KafkaUtilException is thrown to indicate a hard failure that the Kafka + * CheckpointManager can't recover from. + */ +class KafkaUtilException(var message: String, t: Throwable) extends SamzaException(message, t) { + def this(s: String) = this(s, null) +} \ No newline at end of file
