Repository: incubator-samza Updated Branches: refs/heads/master 156d4a4f7 -> 47edbc3a2
SAMZA-64; Fail KafkaCheckpointManager on unrecoverable errors Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/47edbc3a Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/47edbc3a Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/47edbc3a Branch: refs/heads/master Commit: 47edbc3a28a269269f2ef4494b2f6c89a6c46450 Parents: 156d4a4 Author: Yan Fang <[email protected]> Authored: Tue Apr 22 12:31:53 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Apr 22 12:31:53 2014 -0700 ---------------------------------------------------------------------- .../kafka/KafkaCheckpointManager.scala | 6 +++ .../kafka/TestKafkaCheckpointManager.scala | 48 ++++++++++++++++++++ 2 files changed, 54 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/47edbc3a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index fed6eee..62c91e8 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -28,6 +28,8 @@ import kafka.api.PartitionOffsetRequestInfo import kafka.common.ErrorMapping import kafka.common.TopicAndPartition import kafka.common.TopicExistsException +import kafka.common.InvalidMessageSizeException +import kafka.common.UnknownTopicOrPartitionException import kafka.consumer.SimpleConsumer import kafka.producer.KeyedMessage import kafka.producer.Partitioner @@ -36,6 +38,7 @@ import kafka.serializer.Decoder import kafka.serializer.Encoder import kafka.utils.Utils import kafka.utils.VerifiableProperties +import kafka.message.InvalidMessageException import org.apache.samza.Partition import org.apache.samza.SamzaException import org.apache.samza.checkpoint.Checkpoint @@ -181,6 +184,9 @@ class KafkaCheckpointManager( (exception, loop) => { exception match { + case e: InvalidMessageException => throw new KafkaCheckpointException ("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e) + case e: InvalidMessageSizeException => throw new KafkaCheckpointException ("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e) + case e: UnknownTopicOrPartitionException => throw new KafkaCheckpointException ("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e) case e: KafkaCheckpointException => throw e case e: Exception => warn("While trying to read last checkpoint for topic %s and partition %s: %s. Retrying." format (checkpointTopic, partition, e)) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/47edbc3a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index f1a8f8a..92ac61e 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -39,8 +39,12 @@ import scala.collection.JavaConversions._ import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore } import org.apache.samza.config.MapConfig import org.apache.samza.checkpoint.Checkpoint +import org.apache.samza.serializers.CheckpointSerde import org.apache.samza.system.SystemStream import kafka.utils.ZKStringSerializer +import kafka.message.InvalidMessageException +import kafka.common.InvalidMessageSizeException +import kafka.common.UnknownTopicOrPartitionException object TestKafkaCheckpointManager { val zkConnect: String = TestZKUtils.zookeeperConnect @@ -129,6 +133,25 @@ class TestKafkaCheckpointManager { kcm.stop } + @Test + def testUnrecovableKafkaErrorShouldThrowKafkaCheckpointManagerException { + val exceptions = List("InvalidMessageException", "InvalidMessageSizeException", "UnknownTopicOrPartitionException") + exceptions.foreach { exceptionName => + val kcm = getKafkaCheckpointManagerWithInvalidSerde(exceptionName) + kcm.register(partition) + kcm.start + kcm.writeCheckpoint(partition, cp1) + // because serde will throw unrecoverable errors, it should result a KafkaCheckpointException + try { + val readCpInvalide = kcm.readLastCheckpoint(partition) + fail("Expected a KafkaCheckpointException.") + } catch { + case e: KafkaCheckpointException => None + } + kcm.stop + } + } + private def getKafkaCheckpointManager = new KafkaCheckpointManager( clientId = "some-client-id", checkpointTopic = "checkpoint-topic", @@ -141,4 +164,29 @@ class TestKafkaCheckpointManager { metadataStore = metadataStore, connectProducer = () => new Producer[Partition, Array[Byte]](producerConfig), connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) + + // inject serde. Kafka exceptions will be thrown when serde.fromBytes is called + private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager( + clientId = "some-client-id-invalid-serde", + checkpointTopic = "checkpoint-topic-invalid-serde", + systemName = "kafka", + totalPartitions = 1, + replicationFactor = 3, + socketTimeout = 30000, + bufferSize = 64 * 1024, + fetchSize = 300 * 1024, + metadataStore = metadataStore, + connectProducer = () => new Producer[Partition, Array[Byte]](producerConfig), + connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), + serde = new InvalideSerde(exception)) + + class InvalideSerde(exception: String) extends CheckpointSerde { + override def fromBytes(bytes: Array[Byte]): Checkpoint = { + exception match { + case "InvalidMessageException" => throw new InvalidMessageException + case "InvalidMessageSizeException" => throw new InvalidMessageSizeException + case "UnknownTopicOrPartitionException" => throw new UnknownTopicOrPartitionException + } + } + } }
